mirror of
https://github.com/stashapp/stash.git
synced 2026-05-07 12:00:24 +02:00
fix memory leak (#6796)
* allow channels to passively drain, empty fileQueue, scanner after scanning * Prevent job executor retention in subscription channels --------- Co-authored-by: feederbox826 <feederbox826@users.noreply.github.com> Co-authored-by: Gykes <Gykes@pm.me>
This commit is contained in:
parent
0ed2992a72
commit
672147deaf
4 changed files with 30 additions and 11 deletions
|
|
@ -283,8 +283,10 @@ func (j *ScanJob) processQueue(ctx context.Context, parallelTasks int, progress
|
|||
|
||||
for f := range j.fileQueue {
|
||||
logger.Tracef("Processing queued file %s", f.Path)
|
||||
if err := ctx.Err(); err != nil {
|
||||
return
|
||||
if ctx.Err() != nil {
|
||||
// Keep receiving until queueFiles closes the channel; otherwise
|
||||
// the walker can block on send (full buffer) and never finish.
|
||||
continue
|
||||
}
|
||||
|
||||
wg.Add()
|
||||
|
|
|
|||
|
|
@ -66,6 +66,23 @@ type Job struct {
|
|||
cancelFunc context.CancelFunc
|
||||
}
|
||||
|
||||
// statusCopy returns a copy of the Job with only the fields needed for
|
||||
// status reporting. Internal fields (exec, cancelFunc, outerCtx) are
|
||||
// excluded so that subscription channels don't retain heavy resources.
|
||||
func (j *Job) statusCopy() Job {
|
||||
return Job{
|
||||
ID: j.ID,
|
||||
Status: j.Status,
|
||||
Details: j.Details,
|
||||
Description: j.Description,
|
||||
Progress: j.Progress,
|
||||
StartTime: j.StartTime,
|
||||
EndTime: j.EndTime,
|
||||
AddTime: j.AddTime,
|
||||
Error: j.Error,
|
||||
}
|
||||
}
|
||||
|
||||
// TimeElapsed returns the total time elapsed for the job.
|
||||
// If the EndTime is set, then it uses this to calculate the elapsed time, otherwise it uses time.Now.
|
||||
func (j *Job) TimeElapsed() time.Duration {
|
||||
|
|
|
|||
|
|
@ -105,7 +105,7 @@ func (m *Manager) notifyNewJob(j *Job) {
|
|||
for _, s := range m.subscriptions {
|
||||
// don't block if channel is full
|
||||
select {
|
||||
case s.newJob <- *j:
|
||||
case s.newJob <- j.statusCopy():
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
|
@ -232,7 +232,9 @@ func (m *Manager) removeJob(job *Job) {
|
|||
return
|
||||
}
|
||||
|
||||
// clear any subtasks
|
||||
// release the executor and subtask details so they can be GC'd
|
||||
// while the job remains in the graveyard for status reporting
|
||||
job.exec = nil
|
||||
job.Details = nil
|
||||
|
||||
m.queue = append(m.queue[:index], m.queue[index+1:]...)
|
||||
|
|
@ -246,7 +248,7 @@ func (m *Manager) removeJob(job *Job) {
|
|||
for _, s := range m.subscriptions {
|
||||
// don't block if channel is full
|
||||
select {
|
||||
case s.removedJob <- *job:
|
||||
case s.removedJob <- job.statusCopy():
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
|
@ -310,8 +312,7 @@ func (m *Manager) GetJob(id int) *Job {
|
|||
// get from the queue or graveyard
|
||||
_, j := m.getJob(append(m.queue, m.graveyard...), id)
|
||||
if j != nil {
|
||||
// make a copy of the job and return the pointer
|
||||
jCopy := *j
|
||||
jCopy := j.statusCopy()
|
||||
return &jCopy
|
||||
}
|
||||
|
||||
|
|
@ -326,8 +327,7 @@ func (m *Manager) GetQueue() []Job {
|
|||
var ret []Job
|
||||
|
||||
for _, j := range m.queue {
|
||||
jCopy := *j
|
||||
ret = append(ret, jCopy)
|
||||
ret = append(ret, j.statusCopy())
|
||||
}
|
||||
|
||||
return ret
|
||||
|
|
@ -372,7 +372,7 @@ func (m *Manager) notifyJobUpdate(j *Job) {
|
|||
for _, s := range m.subscriptions {
|
||||
// don't block if channel is full
|
||||
select {
|
||||
case s.updatedJob <- *j:
|
||||
case s.updatedJob <- j.statusCopy():
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ func (tq *TaskQueue) executer(ctx context.Context) {
|
|||
defer tq.wg.Wait()
|
||||
for task := range tq.tasks {
|
||||
if IsCancelled(ctx) {
|
||||
return
|
||||
continue // allow channel to continue draining until Close()
|
||||
}
|
||||
|
||||
tt := task
|
||||
|
|
|
|||
Loading…
Reference in a new issue