Refactor scan to use separate job for generation

This commit is contained in:
WithoutPants 2026-03-16 17:43:11 +11:00
parent 2f230e5897
commit 56bfc43118
4 changed files with 276 additions and 134 deletions

View file

@ -16,6 +16,7 @@ import (
"github.com/stashapp/stash/pkg/job"
"github.com/stashapp/stash/pkg/logger"
"github.com/stashapp/stash/pkg/models"
"github.com/stashapp/stash/pkg/utils"
)
func useAsVideo(pathname string) bool {
@ -117,14 +118,11 @@ type ScanMetaDataFilterInput struct {
MinModTime *time.Time `json:"minModTime"`
}
func (s *Manager) Scan(ctx context.Context, input ScanMetadataInput) (int, error) {
if err := s.validateFFmpeg(); err != nil {
return 0, err
}
func (s *Manager) makeScanner(rescan bool, minModTime time.Time) *file.Scanner {
cfg := config.GetInstance()
repo := s.Repository
scanner := &file.Scanner{
return &file.Scanner{
Repository: file.NewRepository(s.Repository),
FileDecorators: []file.Decorator{
&file.FilteredDecorator{
@ -143,19 +141,119 @@ func (s *Manager) Scan(ctx context.Context, input ScanMetadataInput) (int, error
FingerprintCalculator: &fingerprintCalculator{s.Config},
FS: &file.OsFS{},
ZipFileExtensions: cfg.GetGalleryExtensions(),
// ScanFilters is set in ScanJob.Execute
// HandlerRequiredFilters is set in ScanJob.Execute
RootPaths: cfg.GetStashPaths().Paths(),
Rescan: input.Rescan,
// FileHandlers must be set during execute as it needs progress reference
ScanFilters: []file.PathFilter{newScanFilter(cfg, repo, minModTime)},
HandlerRequiredFilters: []file.Filter{newHandlerRequiredFilter(cfg, repo)},
RootPaths: cfg.GetStashPaths().Paths(),
Rescan: rescan,
}
}
type ScanGenerateOptions struct {
// Generate covers during scan
GenerateCovers *models.GenerateTiming `json:"generateCovers,omitempty"`
// Generate video previews during scan
GeneratePreviews *models.GenerateTiming `json:"generatePreviews,omitempty"`
// Generate image previews during scan
GenerateImagePreviews *models.GenerateTiming `json:"generateImagePreviews,omitempty"`
// Generate video sprites during scan
GenerateSprites *models.GenerateTiming `json:"generateSprites,omitempty"`
// Generate video phashes during scan
GenerateVideoPhashes *models.GenerateTiming `json:"generateVideoPhashes,omitempty"`
// Generate image phashes during scan
GenerateImagePhashes *models.GenerateTiming `json:"generateImagePhashes,omitempty"`
// Generate image thumbnails during scan
GenerateThumbnails *models.GenerateTiming `json:"generateThumbnails,omitempty"`
// Generate image clip previews during scan
GenerateClipPreviews *models.GenerateTiming `json:"generateClipPreviews,omitempty"`
}
func makeGenerateOptions(input ScanMetadataInput) ScanGenerateOptions {
cfg := config.GetInstance()
timing := models.GenerateTimingAsync
if cfg.GetSequentialScanning() {
timing = models.GenerateTimingSync
}
setTiming := func(include bool) *models.GenerateTiming {
if include {
return &timing
}
return nil
}
setTimingSync := func(include bool) *models.GenerateTiming {
if include {
v := models.GenerateTimingSync
return &v
}
return nil
}
return ScanGenerateOptions{
// covers should be generated synchronously to ensure they are available
// when the scene is created, as they are used as the default cover
GenerateCovers: setTimingSync(input.ScanGenerateCovers),
GeneratePreviews: setTiming(input.ScanGeneratePreviews),
GenerateImagePreviews: setTiming(input.ScanGenerateImagePreviews),
GenerateSprites: setTiming(input.ScanGenerateSprites),
GenerateVideoPhashes: setTiming(input.ScanGeneratePhashes),
GenerateThumbnails: setTiming(input.ScanGenerateThumbnails),
GenerateClipPreviews: setTiming(input.ScanGenerateClipPreviews),
GenerateImagePhashes: setTiming(input.ScanGenerateImagePhashes),
}
}
func (s *Manager) Scan(ctx context.Context, input ScanMetadataInput) (int, error) {
if err := s.validateFFmpeg(); err != nil {
return 0, err
}
var minModTime time.Time
if input.Filter != nil && input.Filter.MinModTime != nil {
minModTime = *input.Filter.MinModTime
}
scanner := s.makeScanner(input.Rescan, minModTime)
// we want the generate task to run concurrently, but it shouldn't be created first
jobID := s.JobManager.Add(ctx, "Scanning...", job.MakeJobExec(func(ctx context.Context, progress *job.Progress) error {
taskQueueChan := make(chan *job.TaskQueue)
// add the generate task in here
s.JobManager.Start(ctx, "Generating for scanned files...", job.MakeJobExec(func(ctx context.Context, progress *job.Progress) error {
const taskQueueSize = 200000
nTasks := s.Config.GetParallelTasksWithAutoDetection()
taskQueue := job.NewTaskQueue(ctx, progress, taskQueueSize, nTasks)
taskQueueChan <- taskQueue
close(taskQueueChan)
taskQueue.Wait()
logger.Debug("Finished generating from scanned files")
return nil
}))
taskQueue := <-taskQueueChan
scanJob := ScanJob{
scanner: scanner,
input: input,
subscriptions: s.scanSubs,
scanner: scanner,
generateOptions: makeGenerateOptions(input),
paths: input.Paths,
taskQueue: taskQueue,
subscriptions: s.scanSubs,
}
return s.JobManager.Add(ctx, "Scanning...", &scanJob), nil
scanJob.Execute(ctx, progress)
// close task queue
taskQueue.Close()
return nil
}))
return jobID, nil
}
}
func (s *Manager) Import(ctx context.Context) (int, error) {

View file

@ -12,6 +12,7 @@ import (
type GeneratePreviewTask struct {
Scene models.Scene
VideoPreview bool
ImagePreview bool
Options generate.PreviewOptions
@ -84,6 +85,10 @@ func (t *GeneratePreviewTask) required() bool {
}
func (t *GeneratePreviewTask) videoPreviewRequired() bool {
if !t.VideoPreview {
return false
}
if t.Scene.Path == "" {
return false
}

View file

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"
"regexp"
"runtime/debug"
@ -30,9 +31,11 @@ import (
)
type ScanJob struct {
scanner *file.Scanner
input ScanMetadataInput
subscriptions *subscriptionManager
scanner *file.Scanner
paths []string
generateOptions ScanGenerateOptions
taskQueue *job.TaskQueue
subscriptions *subscriptionManager
fileQueue chan file.ScannedFile
count int
@ -42,46 +45,29 @@ type ScanJob struct {
func (j *ScanJob) Execute(ctx context.Context, progress *job.Progress) error {
cfg := config.GetInstance()
input := j.input
if job.IsCancelled(ctx) {
logger.Info("Stopping due to user request")
return nil
}
sp := getScanPaths(input.Paths)
sp := getScanPaths(j.paths)
paths := make([]string, len(sp))
for i, p := range sp {
paths[i] = p.Path
}
mgr := GetInstance()
c := mgr.Config
repo := mgr.Repository
start := time.Now()
nTasks := cfg.GetParallelTasksWithAutoDetection()
const taskQueueSize = 200000
taskQueue := job.NewTaskQueue(ctx, progress, taskQueueSize, nTasks)
var minModTime time.Time
if j.input.Filter != nil && j.input.Filter.MinModTime != nil {
minModTime = *j.input.Filter.MinModTime
}
// HACK - these should really be set in the scanner initialization
j.scanner.FileHandlers = getScanHandlers(j.input, taskQueue, progress)
j.scanner.ScanFilters = []file.PathFilter{newScanFilter(c, repo, minModTime)}
j.scanner.HandlerRequiredFilters = []file.Filter{newHandlerRequiredFilter(cfg, repo)}
j.scanner.FileHandlers = getScanHandlers(j.generateOptions, j.taskQueue, progress)
logger.Infof("Starting scan of %d paths with %d parallel tasks", len(paths), nTasks)
j.runJob(ctx, paths, nTasks, progress)
taskQueue.Close()
if job.IsCancelled(ctx) {
logger.Info("Stopping due to user request")
return nil
@ -153,6 +139,33 @@ func (j *ScanJob) queueFiles(ctx context.Context, paths []string, progress *job.
return err
}
func makeScannedFile(f models.FS, path string, info fs.FileInfo, zipFile *file.ScannedFile) (*file.ScannedFile, error) {
size, err := file.GetFileSize(f, path, info)
if err != nil {
return nil, err
}
ff := file.ScannedFile{
BaseFile: &models.BaseFile{
DirEntry: models.DirEntry{
ModTime: file.ModTime(info),
},
Path: path,
Basename: filepath.Base(path),
Size: size,
},
FS: f,
Info: info,
}
if zipFile != nil {
ff.ZipFileID = &zipFile.ID
ff.ZipFile = zipFile
}
return &ff, nil
}
func (j *ScanJob) queueFileFunc(ctx context.Context, f models.FS, zipFile *file.ScannedFile, progress *job.Progress) fs.WalkDirFunc {
return func(path string, d fs.DirEntry, err error) error {
if err != nil {
@ -197,32 +210,14 @@ func (j *ScanJob) queueFileFunc(ctx context.Context, f models.FS, zipFile *file.
return nil
}
size, err := file.GetFileSize(f, path, info)
ff, err := makeScannedFile(f, path, info, zipFile)
if err != nil {
return err
}
ff := file.ScannedFile{
BaseFile: &models.BaseFile{
DirEntry: models.DirEntry{
ModTime: file.ModTime(info),
},
Path: path,
Basename: filepath.Base(path),
Size: size,
},
FS: f,
Info: info,
}
if zipFile != nil {
ff.ZipFileID = &zipFile.ID
ff.ZipFile = zipFile
}
if info.IsDir() {
// handle folders immediately
if err := j.handleFolder(ctx, ff, progress); err != nil {
if err := j.handleFolder(ctx, *ff, progress); err != nil {
if !errors.Is(err, context.Canceled) {
logger.Errorf("error processing %q: %v", path, err)
}
@ -238,7 +233,7 @@ func (j *ScanJob) queueFileFunc(ctx context.Context, f models.FS, zipFile *file.
if zipFile != nil {
progress.ExecuteTask("Scanning "+path, func() {
// don't increment progress in zip files
if err := j.handleFile(ctx, ff, nil); err != nil {
if err := j.handleFile(ctx, *ff, nil); err != nil {
if !errors.Is(err, context.Canceled) {
logger.Errorf("error processing %q: %v", path, err)
}
@ -250,7 +245,7 @@ func (j *ScanJob) queueFileFunc(ctx context.Context, f models.FS, zipFile *file.
}
logger.Tracef("Queueing file %s for scanning", path)
j.fileQueue <- ff
j.fileQueue <- *ff
j.count++
@ -623,9 +618,6 @@ func (f *scanFilter) Accept(ctx context.Context, path string, info fs.FileInfo)
}
type scanConfig struct {
isGenerateThumbnails bool
isGenerateClipPreviews bool
createGalleriesFromFolders bool
}
@ -645,7 +637,7 @@ func galleryFileFilter(ctx context.Context, f models.File) bool {
return isZip(f.Base().Basename)
}
func getScanHandlers(options ScanMetadataInput, taskQueue *job.TaskQueue, progress *job.Progress) []file.Handler {
func getScanHandlers(options ScanGenerateOptions, taskQueue *job.TaskQueue, progress *job.Progress) []file.Handler {
mgr := GetInstance()
c := mgr.Config
r := mgr.Repository
@ -658,15 +650,13 @@ func getScanHandlers(options ScanMetadataInput, taskQueue *job.TaskQueue, progre
CreatorUpdater: r.Image,
GalleryFinder: r.Gallery,
ScanGenerator: &imageGenerators{
input: options,
taskQueue: taskQueue,
progress: progress,
paths: mgr.Paths,
sequentialScanning: c.GetSequentialScanning(),
generators: generators{
taskQueue: taskQueue,
},
options: options,
paths: mgr.Paths,
},
ScanConfig: &scanConfig{
isGenerateThumbnails: options.ScanGenerateThumbnails,
isGenerateClipPreviews: options.ScanGenerateClipPreviews,
createGalleriesFromFolders: c.GetCreateGalleriesFromFolders(),
},
PluginCache: pluginCache,
@ -689,12 +679,12 @@ func getScanHandlers(options ScanMetadataInput, taskQueue *job.TaskQueue, progre
CaptionUpdater: r.File,
PluginCache: pluginCache,
ScanGenerator: &sceneGenerators{
input: options,
taskQueue: taskQueue,
progress: progress,
generators: generators{
taskQueue: taskQueue,
},
options: options,
paths: mgr.Paths,
fileNamingAlgorithm: c.GetVideoFileNamingAlgorithm(),
sequentialScanning: c.GetSequentialScanning(),
},
FileNamingAlgorithm: c.GetVideoFileNamingAlgorithm(),
Paths: mgr.Paths,
@ -703,20 +693,29 @@ func getScanHandlers(options ScanMetadataInput, taskQueue *job.TaskQueue, progre
}
}
type imageGenerators struct {
input ScanMetadataInput
type generators struct {
taskQueue *job.TaskQueue
progress *job.Progress
}
paths *paths.Paths
sequentialScanning bool
func (g *generators) generate(ctx context.Context, desc string, generateFunc func(ctx context.Context), timing models.GenerateTiming) {
if timing == models.GenerateTimingSync {
generateFunc(ctx)
} else {
g.taskQueue.Add(desc, generateFunc)
}
}
type imageGenerators struct {
generators
options ScanGenerateOptions
paths *paths.Paths
}
func (g *imageGenerators) Generate(ctx context.Context, i *models.Image, f models.File) error {
const overwrite = false
progress := g.progress
t := g.input
t := g.options
path := f.Base().Path
// this is a bit of a hack: the task requires files to be loaded, but
@ -724,20 +723,18 @@ func (g *imageGenerators) Generate(ctx context.Context, i *models.Image, f model
ii := *i
ii.Files = models.NewRelatedFiles([]models.File{f})
if t.ScanGenerateThumbnails {
// this should be quick, so always generate sequentially
if t.GenerateThumbnails != nil {
taskThumbnail := GenerateImageThumbnailTask{
Image: ii,
Overwrite: overwrite,
}
taskThumbnail.Start(ctx)
g.generate(ctx, fmt.Sprintf("Generating thumbnail for %s", path), taskThumbnail.Start, *t.GenerateThumbnails)
}
// avoid adding a task if the file isn't a video file
_, isVideo := f.(*models.VideoFile)
if isVideo && t.ScanGenerateClipPreviews {
progress.AddTotal(1)
if isVideo && t.GenerateClipPreviews != nil {
previewsFn := func(ctx context.Context) {
taskPreview := GenerateClipPreviewTask{
Image: ii,
@ -745,18 +742,12 @@ func (g *imageGenerators) Generate(ctx context.Context, i *models.Image, f model
}
taskPreview.Start(ctx)
progress.Increment()
}
if g.sequentialScanning {
previewsFn(ctx)
} else {
g.taskQueue.Add(fmt.Sprintf("Generating preview for %s", path), previewsFn)
}
g.generate(ctx, fmt.Sprintf("Generating preview for %s", path), previewsFn, *t.GenerateClipPreviews)
}
if t.ScanGenerateImagePhashes {
progress.AddTotal(1)
if t.GenerateImagePhashes != nil {
phashFn := func(ctx context.Context) {
mgr := GetInstance()
// Only generate phash for image files, not video files
@ -768,40 +759,31 @@ func (g *imageGenerators) Generate(ctx context.Context, i *models.Image, f model
}
taskPhash.Start(ctx)
}
progress.Increment()
}
if g.sequentialScanning {
phashFn(ctx)
} else {
g.taskQueue.Add(fmt.Sprintf("Generating phash for %s", path), phashFn)
}
g.generate(ctx, fmt.Sprintf("Generating phash for %s", path), phashFn, *t.GenerateImagePhashes)
}
return nil
}
type sceneGenerators struct {
input ScanMetadataInput
taskQueue *job.TaskQueue
progress *job.Progress
generators
options ScanGenerateOptions
paths *paths.Paths
fileNamingAlgorithm models.HashAlgorithm
sequentialScanning bool
}
func (g *sceneGenerators) Generate(ctx context.Context, s *models.Scene, f *models.VideoFile) error {
const overwrite = false
progress := g.progress
t := g.input
t := g.options
path := f.Path
mgr := GetInstance()
if t.ScanGenerateSprites {
progress.AddTotal(1)
if t.GenerateSprites != nil {
spriteFn := func(ctx context.Context) {
taskSprite := GenerateSpriteTask{
Scene: *s,
@ -809,18 +791,12 @@ func (g *sceneGenerators) Generate(ctx context.Context, s *models.Scene, f *mode
fileNamingAlgorithm: g.fileNamingAlgorithm,
}
taskSprite.Start(ctx)
progress.Increment()
}
if g.sequentialScanning {
spriteFn(ctx)
} else {
g.taskQueue.Add(fmt.Sprintf("Generating sprites for %s", path), spriteFn)
}
g.generate(ctx, fmt.Sprintf("Generating sprites for %s", path), spriteFn, *t.GenerateSprites)
}
if t.ScanGeneratePhashes {
progress.AddTotal(1)
if t.GenerateVideoPhashes != nil {
phashFn := func(ctx context.Context) {
taskPhash := GeneratePhashTask{
repository: mgr.Repository,
@ -829,18 +805,12 @@ func (g *sceneGenerators) Generate(ctx context.Context, s *models.Scene, f *mode
fileNamingAlgorithm: g.fileNamingAlgorithm,
}
taskPhash.Start(ctx)
progress.Increment()
}
if g.sequentialScanning {
phashFn(ctx)
} else {
g.taskQueue.Add(fmt.Sprintf("Generating phash for %s", path), phashFn)
}
g.generate(ctx, fmt.Sprintf("Generating phash for %s", path), phashFn, *t.GenerateVideoPhashes)
}
if t.ScanGeneratePreviews {
progress.AddTotal(1)
if t.GeneratePreviews != nil || t.GenerateImagePreviews != nil {
previewsFn := func(ctx context.Context) {
options := getGeneratePreviewOptions(GeneratePreviewOptionsInput{})
@ -855,34 +825,36 @@ func (g *sceneGenerators) Generate(ctx context.Context, s *models.Scene, f *mode
taskPreview := GeneratePreviewTask{
Scene: *s,
ImagePreview: t.ScanGenerateImagePreviews,
VideoPreview: t.GeneratePreviews != nil,
ImagePreview: t.GenerateImagePreviews != nil,
Options: options,
Overwrite: overwrite,
fileNamingAlgorithm: g.fileNamingAlgorithm,
generator: generator,
}
taskPreview.Start(ctx)
progress.Increment()
}
if g.sequentialScanning {
previewsFn(ctx)
} else {
g.taskQueue.Add(fmt.Sprintf("Generating preview for %s", path), previewsFn)
// use value of generate previews if present, otherwise use images
timing := t.GeneratePreviews
if timing == nil {
timing = t.GenerateImagePreviews
}
g.generate(ctx, fmt.Sprintf("Generating preview for %s", path), previewsFn, *timing)
}
if t.ScanGenerateCovers {
progress.AddTotal(1)
g.taskQueue.Add(fmt.Sprintf("Generating cover for %s", path), func(ctx context.Context) {
if t.GenerateCovers != nil {
fn := func(ctx context.Context) {
taskCover := GenerateCoverTask{
repository: mgr.Repository,
Scene: *s,
Overwrite: overwrite,
}
taskCover.Start(ctx)
progress.Increment()
})
}
g.generate(ctx, fmt.Sprintf("Generating cover for %s", path), fn, *t.GenerateCovers)
}
return nil

67
pkg/models/enums.go Normal file
View file

@ -0,0 +1,67 @@
package models
import (
"bytes"
"fmt"
"io"
"strconv"
)
type GenerateTiming string
const (
// Generate metadata before returning from the scan API call.
// This will block the API call until generation is complete, and is not recommended for large artifacts.
// Recommended when you want the metadata/artifact to be available immediately after the scan completes (ie covers and phashes).
GenerateTimingSync GenerateTiming = "SYNC"
// Generate metadata asynchronously after the scan completes.
GenerateTimingAsync GenerateTiming = "ASYNC"
)
var AllGenerateTiming = []GenerateTiming{
GenerateTimingSync,
GenerateTimingAsync,
}
func (e GenerateTiming) IsValid() bool {
switch e {
case GenerateTimingSync, GenerateTimingAsync:
return true
}
return false
}
func (e GenerateTiming) String() string {
return string(e)
}
func (e *GenerateTiming) UnmarshalGQL(v any) error {
str, ok := v.(string)
if !ok {
return fmt.Errorf("enums must be strings")
}
*e = GenerateTiming(str)
if !e.IsValid() {
return fmt.Errorf("%s is not a valid GenerateTiming", str)
}
return nil
}
func (e GenerateTiming) MarshalGQL(w io.Writer) {
fmt.Fprint(w, strconv.Quote(e.String()))
}
func (e *GenerateTiming) UnmarshalJSON(b []byte) error {
s, err := strconv.Unquote(string(b))
if err != nil {
return err
}
return e.UnmarshalGQL(s)
}
func (e GenerateTiming) MarshalJSON() ([]byte, error) {
var buf bytes.Buffer
e.MarshalGQL(&buf)
return buf.Bytes(), nil
}