From 56bfc43118197931360db68a2cf38858df7b4ee5 Mon Sep 17 00:00:00 2001 From: WithoutPants <53250216+WithoutPants@users.noreply.github.com> Date: Mon, 16 Mar 2026 17:43:11 +1100 Subject: [PATCH] Refactor scan to use separate job for generation --- internal/manager/manager_tasks.go | 126 +++++++++++-- internal/manager/task_generate_preview.go | 5 + internal/manager/task_scan.go | 212 ++++++++++------------ pkg/models/enums.go | 67 +++++++ 4 files changed, 276 insertions(+), 134 deletions(-) create mode 100644 pkg/models/enums.go diff --git a/internal/manager/manager_tasks.go b/internal/manager/manager_tasks.go index c9e840519..eaa27aa87 100644 --- a/internal/manager/manager_tasks.go +++ b/internal/manager/manager_tasks.go @@ -16,6 +16,7 @@ import ( "github.com/stashapp/stash/pkg/job" "github.com/stashapp/stash/pkg/logger" "github.com/stashapp/stash/pkg/models" + "github.com/stashapp/stash/pkg/utils" ) func useAsVideo(pathname string) bool { @@ -117,14 +118,11 @@ type ScanMetaDataFilterInput struct { MinModTime *time.Time `json:"minModTime"` } -func (s *Manager) Scan(ctx context.Context, input ScanMetadataInput) (int, error) { - if err := s.validateFFmpeg(); err != nil { - return 0, err - } - +func (s *Manager) makeScanner(rescan bool, minModTime time.Time) *file.Scanner { cfg := config.GetInstance() + repo := s.Repository - scanner := &file.Scanner{ + return &file.Scanner{ Repository: file.NewRepository(s.Repository), FileDecorators: []file.Decorator{ &file.FilteredDecorator{ @@ -143,19 +141,119 @@ func (s *Manager) Scan(ctx context.Context, input ScanMetadataInput) (int, error FingerprintCalculator: &fingerprintCalculator{s.Config}, FS: &file.OsFS{}, ZipFileExtensions: cfg.GetGalleryExtensions(), - // ScanFilters is set in ScanJob.Execute - // HandlerRequiredFilters is set in ScanJob.Execute - RootPaths: cfg.GetStashPaths().Paths(), - Rescan: input.Rescan, + // FileHandlers must be set during execute as it needs progress reference + ScanFilters: []file.PathFilter{newScanFilter(cfg, repo, minModTime)}, + HandlerRequiredFilters: []file.Filter{newHandlerRequiredFilter(cfg, repo)}, + RootPaths: cfg.GetStashPaths().Paths(), + Rescan: rescan, } +} + +type ScanGenerateOptions struct { + // Generate covers during scan + GenerateCovers *models.GenerateTiming `json:"generateCovers,omitempty"` + // Generate video previews during scan + GeneratePreviews *models.GenerateTiming `json:"generatePreviews,omitempty"` + // Generate image previews during scan + GenerateImagePreviews *models.GenerateTiming `json:"generateImagePreviews,omitempty"` + // Generate video sprites during scan + GenerateSprites *models.GenerateTiming `json:"generateSprites,omitempty"` + // Generate video phashes during scan + GenerateVideoPhashes *models.GenerateTiming `json:"generateVideoPhashes,omitempty"` + // Generate image phashes during scan + GenerateImagePhashes *models.GenerateTiming `json:"generateImagePhashes,omitempty"` + // Generate image thumbnails during scan + GenerateThumbnails *models.GenerateTiming `json:"generateThumbnails,omitempty"` + // Generate image clip previews during scan + GenerateClipPreviews *models.GenerateTiming `json:"generateClipPreviews,omitempty"` +} + +func makeGenerateOptions(input ScanMetadataInput) ScanGenerateOptions { + cfg := config.GetInstance() + timing := models.GenerateTimingAsync + if cfg.GetSequentialScanning() { + timing = models.GenerateTimingSync + } + + setTiming := func(include bool) *models.GenerateTiming { + if include { + return &timing + } + return nil + } + + setTimingSync := func(include bool) *models.GenerateTiming { + if include { + v := models.GenerateTimingSync + return &v + } + return nil + } + + return ScanGenerateOptions{ + // covers should be generated synchronously to ensure they are available + // when the scene is created, as they are used as the default cover + GenerateCovers: setTimingSync(input.ScanGenerateCovers), + GeneratePreviews: setTiming(input.ScanGeneratePreviews), + GenerateImagePreviews: setTiming(input.ScanGenerateImagePreviews), + GenerateSprites: setTiming(input.ScanGenerateSprites), + GenerateVideoPhashes: setTiming(input.ScanGeneratePhashes), + GenerateThumbnails: setTiming(input.ScanGenerateThumbnails), + GenerateClipPreviews: setTiming(input.ScanGenerateClipPreviews), + GenerateImagePhashes: setTiming(input.ScanGenerateImagePhashes), + } +} + +func (s *Manager) Scan(ctx context.Context, input ScanMetadataInput) (int, error) { + if err := s.validateFFmpeg(); err != nil { + return 0, err + } + + var minModTime time.Time + if input.Filter != nil && input.Filter.MinModTime != nil { + minModTime = *input.Filter.MinModTime + } + + scanner := s.makeScanner(input.Rescan, minModTime) + + // we want the generate task to run concurrently, but it shouldn't be created first + jobID := s.JobManager.Add(ctx, "Scanning...", job.MakeJobExec(func(ctx context.Context, progress *job.Progress) error { + taskQueueChan := make(chan *job.TaskQueue) + + // add the generate task in here + s.JobManager.Start(ctx, "Generating for scanned files...", job.MakeJobExec(func(ctx context.Context, progress *job.Progress) error { + const taskQueueSize = 200000 + nTasks := s.Config.GetParallelTasksWithAutoDetection() + taskQueue := job.NewTaskQueue(ctx, progress, taskQueueSize, nTasks) + + taskQueueChan <- taskQueue + close(taskQueueChan) + + taskQueue.Wait() + logger.Debug("Finished generating from scanned files") + return nil + })) + + taskQueue := <-taskQueueChan scanJob := ScanJob{ - scanner: scanner, - input: input, - subscriptions: s.scanSubs, + scanner: scanner, + generateOptions: makeGenerateOptions(input), + paths: input.Paths, + taskQueue: taskQueue, + subscriptions: s.scanSubs, } - return s.JobManager.Add(ctx, "Scanning...", &scanJob), nil + scanJob.Execute(ctx, progress) + + // close task queue + taskQueue.Close() + + return nil + })) + + return jobID, nil +} } func (s *Manager) Import(ctx context.Context) (int, error) { diff --git a/internal/manager/task_generate_preview.go b/internal/manager/task_generate_preview.go index df2a69ee5..f3460bd1c 100644 --- a/internal/manager/task_generate_preview.go +++ b/internal/manager/task_generate_preview.go @@ -12,6 +12,7 @@ import ( type GeneratePreviewTask struct { Scene models.Scene + VideoPreview bool ImagePreview bool Options generate.PreviewOptions @@ -84,6 +85,10 @@ func (t *GeneratePreviewTask) required() bool { } func (t *GeneratePreviewTask) videoPreviewRequired() bool { + if !t.VideoPreview { + return false + } + if t.Scene.Path == "" { return false } diff --git a/internal/manager/task_scan.go b/internal/manager/task_scan.go index a006abbf8..2309bfef2 100644 --- a/internal/manager/task_scan.go +++ b/internal/manager/task_scan.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io/fs" + "os" "path/filepath" "regexp" "runtime/debug" @@ -30,9 +31,11 @@ import ( ) type ScanJob struct { - scanner *file.Scanner - input ScanMetadataInput - subscriptions *subscriptionManager + scanner *file.Scanner + paths []string + generateOptions ScanGenerateOptions + taskQueue *job.TaskQueue + subscriptions *subscriptionManager fileQueue chan file.ScannedFile count int @@ -42,46 +45,29 @@ type ScanJob struct { func (j *ScanJob) Execute(ctx context.Context, progress *job.Progress) error { cfg := config.GetInstance() - input := j.input if job.IsCancelled(ctx) { logger.Info("Stopping due to user request") return nil } - sp := getScanPaths(input.Paths) + sp := getScanPaths(j.paths) paths := make([]string, len(sp)) for i, p := range sp { paths[i] = p.Path } - mgr := GetInstance() - c := mgr.Config - repo := mgr.Repository - start := time.Now() nTasks := cfg.GetParallelTasksWithAutoDetection() - const taskQueueSize = 200000 - taskQueue := job.NewTaskQueue(ctx, progress, taskQueueSize, nTasks) - - var minModTime time.Time - if j.input.Filter != nil && j.input.Filter.MinModTime != nil { - minModTime = *j.input.Filter.MinModTime - } - // HACK - these should really be set in the scanner initialization - j.scanner.FileHandlers = getScanHandlers(j.input, taskQueue, progress) - j.scanner.ScanFilters = []file.PathFilter{newScanFilter(c, repo, minModTime)} - j.scanner.HandlerRequiredFilters = []file.Filter{newHandlerRequiredFilter(cfg, repo)} + j.scanner.FileHandlers = getScanHandlers(j.generateOptions, j.taskQueue, progress) logger.Infof("Starting scan of %d paths with %d parallel tasks", len(paths), nTasks) j.runJob(ctx, paths, nTasks, progress) - taskQueue.Close() - if job.IsCancelled(ctx) { logger.Info("Stopping due to user request") return nil @@ -153,6 +139,33 @@ func (j *ScanJob) queueFiles(ctx context.Context, paths []string, progress *job. return err } +func makeScannedFile(f models.FS, path string, info fs.FileInfo, zipFile *file.ScannedFile) (*file.ScannedFile, error) { + size, err := file.GetFileSize(f, path, info) + if err != nil { + return nil, err + } + + ff := file.ScannedFile{ + BaseFile: &models.BaseFile{ + DirEntry: models.DirEntry{ + ModTime: file.ModTime(info), + }, + Path: path, + Basename: filepath.Base(path), + Size: size, + }, + FS: f, + Info: info, + } + + if zipFile != nil { + ff.ZipFileID = &zipFile.ID + ff.ZipFile = zipFile + } + + return &ff, nil +} + func (j *ScanJob) queueFileFunc(ctx context.Context, f models.FS, zipFile *file.ScannedFile, progress *job.Progress) fs.WalkDirFunc { return func(path string, d fs.DirEntry, err error) error { if err != nil { @@ -197,32 +210,14 @@ func (j *ScanJob) queueFileFunc(ctx context.Context, f models.FS, zipFile *file. return nil } - size, err := file.GetFileSize(f, path, info) + ff, err := makeScannedFile(f, path, info, zipFile) if err != nil { return err } - ff := file.ScannedFile{ - BaseFile: &models.BaseFile{ - DirEntry: models.DirEntry{ - ModTime: file.ModTime(info), - }, - Path: path, - Basename: filepath.Base(path), - Size: size, - }, - FS: f, - Info: info, - } - - if zipFile != nil { - ff.ZipFileID = &zipFile.ID - ff.ZipFile = zipFile - } - if info.IsDir() { // handle folders immediately - if err := j.handleFolder(ctx, ff, progress); err != nil { + if err := j.handleFolder(ctx, *ff, progress); err != nil { if !errors.Is(err, context.Canceled) { logger.Errorf("error processing %q: %v", path, err) } @@ -238,7 +233,7 @@ func (j *ScanJob) queueFileFunc(ctx context.Context, f models.FS, zipFile *file. if zipFile != nil { progress.ExecuteTask("Scanning "+path, func() { // don't increment progress in zip files - if err := j.handleFile(ctx, ff, nil); err != nil { + if err := j.handleFile(ctx, *ff, nil); err != nil { if !errors.Is(err, context.Canceled) { logger.Errorf("error processing %q: %v", path, err) } @@ -250,7 +245,7 @@ func (j *ScanJob) queueFileFunc(ctx context.Context, f models.FS, zipFile *file. } logger.Tracef("Queueing file %s for scanning", path) - j.fileQueue <- ff + j.fileQueue <- *ff j.count++ @@ -623,9 +618,6 @@ func (f *scanFilter) Accept(ctx context.Context, path string, info fs.FileInfo) } type scanConfig struct { - isGenerateThumbnails bool - isGenerateClipPreviews bool - createGalleriesFromFolders bool } @@ -645,7 +637,7 @@ func galleryFileFilter(ctx context.Context, f models.File) bool { return isZip(f.Base().Basename) } -func getScanHandlers(options ScanMetadataInput, taskQueue *job.TaskQueue, progress *job.Progress) []file.Handler { +func getScanHandlers(options ScanGenerateOptions, taskQueue *job.TaskQueue, progress *job.Progress) []file.Handler { mgr := GetInstance() c := mgr.Config r := mgr.Repository @@ -658,15 +650,13 @@ func getScanHandlers(options ScanMetadataInput, taskQueue *job.TaskQueue, progre CreatorUpdater: r.Image, GalleryFinder: r.Gallery, ScanGenerator: &imageGenerators{ - input: options, - taskQueue: taskQueue, - progress: progress, - paths: mgr.Paths, - sequentialScanning: c.GetSequentialScanning(), + generators: generators{ + taskQueue: taskQueue, + }, + options: options, + paths: mgr.Paths, }, ScanConfig: &scanConfig{ - isGenerateThumbnails: options.ScanGenerateThumbnails, - isGenerateClipPreviews: options.ScanGenerateClipPreviews, createGalleriesFromFolders: c.GetCreateGalleriesFromFolders(), }, PluginCache: pluginCache, @@ -689,12 +679,12 @@ func getScanHandlers(options ScanMetadataInput, taskQueue *job.TaskQueue, progre CaptionUpdater: r.File, PluginCache: pluginCache, ScanGenerator: &sceneGenerators{ - input: options, - taskQueue: taskQueue, - progress: progress, + generators: generators{ + taskQueue: taskQueue, + }, + options: options, paths: mgr.Paths, fileNamingAlgorithm: c.GetVideoFileNamingAlgorithm(), - sequentialScanning: c.GetSequentialScanning(), }, FileNamingAlgorithm: c.GetVideoFileNamingAlgorithm(), Paths: mgr.Paths, @@ -703,20 +693,29 @@ func getScanHandlers(options ScanMetadataInput, taskQueue *job.TaskQueue, progre } } -type imageGenerators struct { - input ScanMetadataInput +type generators struct { taskQueue *job.TaskQueue - progress *job.Progress +} - paths *paths.Paths - sequentialScanning bool +func (g *generators) generate(ctx context.Context, desc string, generateFunc func(ctx context.Context), timing models.GenerateTiming) { + if timing == models.GenerateTimingSync { + generateFunc(ctx) + } else { + g.taskQueue.Add(desc, generateFunc) + } +} + +type imageGenerators struct { + generators + options ScanGenerateOptions + + paths *paths.Paths } func (g *imageGenerators) Generate(ctx context.Context, i *models.Image, f models.File) error { const overwrite = false - progress := g.progress - t := g.input + t := g.options path := f.Base().Path // this is a bit of a hack: the task requires files to be loaded, but @@ -724,20 +723,18 @@ func (g *imageGenerators) Generate(ctx context.Context, i *models.Image, f model ii := *i ii.Files = models.NewRelatedFiles([]models.File{f}) - if t.ScanGenerateThumbnails { - // this should be quick, so always generate sequentially + if t.GenerateThumbnails != nil { taskThumbnail := GenerateImageThumbnailTask{ Image: ii, Overwrite: overwrite, } - taskThumbnail.Start(ctx) + g.generate(ctx, fmt.Sprintf("Generating thumbnail for %s", path), taskThumbnail.Start, *t.GenerateThumbnails) } // avoid adding a task if the file isn't a video file _, isVideo := f.(*models.VideoFile) - if isVideo && t.ScanGenerateClipPreviews { - progress.AddTotal(1) + if isVideo && t.GenerateClipPreviews != nil { previewsFn := func(ctx context.Context) { taskPreview := GenerateClipPreviewTask{ Image: ii, @@ -745,18 +742,12 @@ func (g *imageGenerators) Generate(ctx context.Context, i *models.Image, f model } taskPreview.Start(ctx) - progress.Increment() } - if g.sequentialScanning { - previewsFn(ctx) - } else { - g.taskQueue.Add(fmt.Sprintf("Generating preview for %s", path), previewsFn) - } + g.generate(ctx, fmt.Sprintf("Generating preview for %s", path), previewsFn, *t.GenerateClipPreviews) } - if t.ScanGenerateImagePhashes { - progress.AddTotal(1) + if t.GenerateImagePhashes != nil { phashFn := func(ctx context.Context) { mgr := GetInstance() // Only generate phash for image files, not video files @@ -768,40 +759,31 @@ func (g *imageGenerators) Generate(ctx context.Context, i *models.Image, f model } taskPhash.Start(ctx) } - progress.Increment() } - if g.sequentialScanning { - phashFn(ctx) - } else { - g.taskQueue.Add(fmt.Sprintf("Generating phash for %s", path), phashFn) - } + g.generate(ctx, fmt.Sprintf("Generating phash for %s", path), phashFn, *t.GenerateImagePhashes) } return nil } type sceneGenerators struct { - input ScanMetadataInput - taskQueue *job.TaskQueue - progress *job.Progress + generators + options ScanGenerateOptions paths *paths.Paths fileNamingAlgorithm models.HashAlgorithm - sequentialScanning bool } func (g *sceneGenerators) Generate(ctx context.Context, s *models.Scene, f *models.VideoFile) error { const overwrite = false - progress := g.progress - t := g.input + t := g.options path := f.Path mgr := GetInstance() - if t.ScanGenerateSprites { - progress.AddTotal(1) + if t.GenerateSprites != nil { spriteFn := func(ctx context.Context) { taskSprite := GenerateSpriteTask{ Scene: *s, @@ -809,18 +791,12 @@ func (g *sceneGenerators) Generate(ctx context.Context, s *models.Scene, f *mode fileNamingAlgorithm: g.fileNamingAlgorithm, } taskSprite.Start(ctx) - progress.Increment() } - if g.sequentialScanning { - spriteFn(ctx) - } else { - g.taskQueue.Add(fmt.Sprintf("Generating sprites for %s", path), spriteFn) - } + g.generate(ctx, fmt.Sprintf("Generating sprites for %s", path), spriteFn, *t.GenerateSprites) } - if t.ScanGeneratePhashes { - progress.AddTotal(1) + if t.GenerateVideoPhashes != nil { phashFn := func(ctx context.Context) { taskPhash := GeneratePhashTask{ repository: mgr.Repository, @@ -829,18 +805,12 @@ func (g *sceneGenerators) Generate(ctx context.Context, s *models.Scene, f *mode fileNamingAlgorithm: g.fileNamingAlgorithm, } taskPhash.Start(ctx) - progress.Increment() } - if g.sequentialScanning { - phashFn(ctx) - } else { - g.taskQueue.Add(fmt.Sprintf("Generating phash for %s", path), phashFn) - } + g.generate(ctx, fmt.Sprintf("Generating phash for %s", path), phashFn, *t.GenerateVideoPhashes) } - if t.ScanGeneratePreviews { - progress.AddTotal(1) + if t.GeneratePreviews != nil || t.GenerateImagePreviews != nil { previewsFn := func(ctx context.Context) { options := getGeneratePreviewOptions(GeneratePreviewOptionsInput{}) @@ -855,34 +825,36 @@ func (g *sceneGenerators) Generate(ctx context.Context, s *models.Scene, f *mode taskPreview := GeneratePreviewTask{ Scene: *s, - ImagePreview: t.ScanGenerateImagePreviews, + VideoPreview: t.GeneratePreviews != nil, + ImagePreview: t.GenerateImagePreviews != nil, Options: options, Overwrite: overwrite, fileNamingAlgorithm: g.fileNamingAlgorithm, generator: generator, } taskPreview.Start(ctx) - progress.Increment() } - if g.sequentialScanning { - previewsFn(ctx) - } else { - g.taskQueue.Add(fmt.Sprintf("Generating preview for %s", path), previewsFn) + // use value of generate previews if present, otherwise use images + timing := t.GeneratePreviews + if timing == nil { + timing = t.GenerateImagePreviews } + + g.generate(ctx, fmt.Sprintf("Generating preview for %s", path), previewsFn, *timing) } - if t.ScanGenerateCovers { - progress.AddTotal(1) - g.taskQueue.Add(fmt.Sprintf("Generating cover for %s", path), func(ctx context.Context) { + if t.GenerateCovers != nil { + fn := func(ctx context.Context) { taskCover := GenerateCoverTask{ repository: mgr.Repository, Scene: *s, Overwrite: overwrite, } taskCover.Start(ctx) - progress.Increment() - }) + } + + g.generate(ctx, fmt.Sprintf("Generating cover for %s", path), fn, *t.GenerateCovers) } return nil diff --git a/pkg/models/enums.go b/pkg/models/enums.go new file mode 100644 index 000000000..cf481c516 --- /dev/null +++ b/pkg/models/enums.go @@ -0,0 +1,67 @@ +package models + +import ( + "bytes" + "fmt" + "io" + "strconv" +) + +type GenerateTiming string + +const ( + // Generate metadata before returning from the scan API call. + // This will block the API call until generation is complete, and is not recommended for large artifacts. + // Recommended when you want the metadata/artifact to be available immediately after the scan completes (ie covers and phashes). + GenerateTimingSync GenerateTiming = "SYNC" + // Generate metadata asynchronously after the scan completes. + GenerateTimingAsync GenerateTiming = "ASYNC" +) + +var AllGenerateTiming = []GenerateTiming{ + GenerateTimingSync, + GenerateTimingAsync, +} + +func (e GenerateTiming) IsValid() bool { + switch e { + case GenerateTimingSync, GenerateTimingAsync: + return true + } + return false +} + +func (e GenerateTiming) String() string { + return string(e) +} + +func (e *GenerateTiming) UnmarshalGQL(v any) error { + str, ok := v.(string) + if !ok { + return fmt.Errorf("enums must be strings") + } + + *e = GenerateTiming(str) + if !e.IsValid() { + return fmt.Errorf("%s is not a valid GenerateTiming", str) + } + return nil +} + +func (e GenerateTiming) MarshalGQL(w io.Writer) { + fmt.Fprint(w, strconv.Quote(e.String())) +} + +func (e *GenerateTiming) UnmarshalJSON(b []byte) error { + s, err := strconv.Unquote(string(b)) + if err != nil { + return err + } + return e.UnmarshalGQL(s) +} + +func (e GenerateTiming) MarshalJSON() ([]byte, error) { + var buf bytes.Buffer + e.MarshalGQL(&buf) + return buf.Bytes(), nil +}