diff --git a/internal/manager/task_scan.go b/internal/manager/task_scan.go index 22849124c..155090cd2 100644 --- a/internal/manager/task_scan.go +++ b/internal/manager/task_scan.go @@ -283,8 +283,10 @@ func (j *ScanJob) processQueue(ctx context.Context, parallelTasks int, progress for f := range j.fileQueue { logger.Tracef("Processing queued file %s", f.Path) - if err := ctx.Err(); err != nil { - return + if ctx.Err() != nil { + // Keep receiving until queueFiles closes the channel; otherwise + // the walker can block on send (full buffer) and never finish. + continue } wg.Add() diff --git a/pkg/job/job.go b/pkg/job/job.go index 48b5e7b13..835cf18ca 100644 --- a/pkg/job/job.go +++ b/pkg/job/job.go @@ -66,6 +66,23 @@ type Job struct { cancelFunc context.CancelFunc } +// statusCopy returns a copy of the Job with only the fields needed for +// status reporting. Internal fields (exec, cancelFunc, outerCtx) are +// excluded so that subscription channels don't retain heavy resources. +func (j *Job) statusCopy() Job { + return Job{ + ID: j.ID, + Status: j.Status, + Details: j.Details, + Description: j.Description, + Progress: j.Progress, + StartTime: j.StartTime, + EndTime: j.EndTime, + AddTime: j.AddTime, + Error: j.Error, + } +} + // TimeElapsed returns the total time elapsed for the job. // If the EndTime is set, then it uses this to calculate the elapsed time, otherwise it uses time.Now. func (j *Job) TimeElapsed() time.Duration { diff --git a/pkg/job/manager.go b/pkg/job/manager.go index 3e47d842b..ba62d102c 100644 --- a/pkg/job/manager.go +++ b/pkg/job/manager.go @@ -105,7 +105,7 @@ func (m *Manager) notifyNewJob(j *Job) { for _, s := range m.subscriptions { // don't block if channel is full select { - case s.newJob <- *j: + case s.newJob <- j.statusCopy(): default: } } @@ -232,7 +232,9 @@ func (m *Manager) removeJob(job *Job) { return } - // clear any subtasks + // release the executor and subtask details so they can be GC'd + // while the job remains in the graveyard for status reporting + job.exec = nil job.Details = nil m.queue = append(m.queue[:index], m.queue[index+1:]...) @@ -246,7 +248,7 @@ func (m *Manager) removeJob(job *Job) { for _, s := range m.subscriptions { // don't block if channel is full select { - case s.removedJob <- *job: + case s.removedJob <- job.statusCopy(): default: } } @@ -310,8 +312,7 @@ func (m *Manager) GetJob(id int) *Job { // get from the queue or graveyard _, j := m.getJob(append(m.queue, m.graveyard...), id) if j != nil { - // make a copy of the job and return the pointer - jCopy := *j + jCopy := j.statusCopy() return &jCopy } @@ -326,8 +327,7 @@ func (m *Manager) GetQueue() []Job { var ret []Job for _, j := range m.queue { - jCopy := *j - ret = append(ret, jCopy) + ret = append(ret, j.statusCopy()) } return ret @@ -372,7 +372,7 @@ func (m *Manager) notifyJobUpdate(j *Job) { for _, s := range m.subscriptions { // don't block if channel is full select { - case s.updatedJob <- *j: + case s.updatedJob <- j.statusCopy(): default: } } diff --git a/pkg/job/task.go b/pkg/job/task.go index fa0891e6f..6dd2cf02b 100644 --- a/pkg/job/task.go +++ b/pkg/job/task.go @@ -51,7 +51,7 @@ func (tq *TaskQueue) executer(ctx context.Context) { defer tq.wg.Wait() for task := range tq.tasks { if IsCancelled(ctx) { - return + continue // allow channel to continue draining until Close() } tt := task