fix: some tasks could be executed with the wrong priority

This commit is contained in:
Gauthier Roebroeck 2021-06-18 15:56:14 +08:00
parent 5fc77b2297
commit 2b6f534f84
4 changed files with 54 additions and 19 deletions

View file

@ -32,12 +32,14 @@ sealed class Task(priority: Int = DEFAULT_PRIORITY) : Serializable {
override fun toString(): String = "RefreshBookMetadata(bookId='$bookId', capabilities=$capabilities, priority='$priority')" override fun toString(): String = "RefreshBookMetadata(bookId='$bookId', capabilities=$capabilities, priority='$priority')"
} }
data class RefreshSeriesMetadata(val seriesId: String) : Task() { class RefreshSeriesMetadata(val seriesId: String, priority: Int = DEFAULT_PRIORITY) : Task(priority) {
override fun uniqueId() = "REFRESH_SERIES_METADATA_$seriesId" override fun uniqueId() = "REFRESH_SERIES_METADATA_$seriesId"
override fun toString(): String = "RefreshSeriesMetadata(seriesId='$seriesId', priority='$priority')"
} }
data class AggregateSeriesMetadata(val seriesId: String) : Task() { class AggregateSeriesMetadata(val seriesId: String, priority: Int = DEFAULT_PRIORITY) : Task(priority) {
override fun uniqueId() = "AGGREGATE_SERIES_METADATA_$seriesId" override fun uniqueId() = "AGGREGATE_SERIES_METADATA_$seriesId"
override fun toString(): String = "AggregateSeriesMetadata(seriesId='$seriesId', priority='$priority')"
} }
class RefreshBookLocalArtwork(val bookId: String, priority: Int = DEFAULT_PRIORITY) : Task(priority) { class RefreshBookLocalArtwork(val bookId: String, priority: Int = DEFAULT_PRIORITY) : Task(priority) {

View file

@ -64,13 +64,13 @@ class TaskHandler(
is Task.RefreshBookMetadata -> is Task.RefreshBookMetadata ->
bookRepository.findByIdOrNull(task.bookId)?.let { book -> bookRepository.findByIdOrNull(task.bookId)?.let { book ->
metadataLifecycle.refreshMetadata(book, task.capabilities) metadataLifecycle.refreshMetadata(book, task.capabilities)
taskReceiver.refreshSeriesMetadata(book.seriesId) taskReceiver.refreshSeriesMetadata(book.seriesId, priority = task.priority - 1)
} ?: logger.warn { "Cannot execute task $task: Book does not exist" } } ?: logger.warn { "Cannot execute task $task: Book does not exist" }
is Task.RefreshSeriesMetadata -> is Task.RefreshSeriesMetadata ->
seriesRepository.findByIdOrNull(task.seriesId)?.let { series -> seriesRepository.findByIdOrNull(task.seriesId)?.let { series ->
metadataLifecycle.refreshMetadata(series) metadataLifecycle.refreshMetadata(series)
taskReceiver.aggregateSeriesMetadata(series.id) taskReceiver.aggregateSeriesMetadata(series.id, priority = task.priority)
} ?: logger.warn { "Cannot execute task $task: Series does not exist" } } ?: logger.warn { "Cannot execute task $task: Series does not exist" }
is Task.AggregateSeriesMetadata -> is Task.AggregateSeriesMetadata ->

View file

@ -83,12 +83,12 @@ class TaskReceiver(
submitTask(Task.RefreshBookMetadata(bookId, capabilities, priority)) submitTask(Task.RefreshBookMetadata(bookId, capabilities, priority))
} }
fun refreshSeriesMetadata(seriesId: String) { fun refreshSeriesMetadata(seriesId: String, priority: Int = DEFAULT_PRIORITY) {
submitTask(Task.RefreshSeriesMetadata(seriesId)) submitTask(Task.RefreshSeriesMetadata(seriesId, priority))
} }
fun aggregateSeriesMetadata(seriesId: String) { fun aggregateSeriesMetadata(seriesId: String, priority: Int = DEFAULT_PRIORITY) {
submitTask(Task.AggregateSeriesMetadata(seriesId)) submitTask(Task.AggregateSeriesMetadata(seriesId, priority))
} }
fun refreshBookLocalArtwork(bookId: String, priority: Int = DEFAULT_PRIORITY) { fun refreshBookLocalArtwork(bookId: String, priority: Int = DEFAULT_PRIORITY) {

View file

@ -2,14 +2,20 @@ package org.gotson.komga.application.tasks
import com.ninjasquad.springmockk.MockkBean import com.ninjasquad.springmockk.MockkBean
import io.mockk.every import io.mockk.every
import io.mockk.just
import io.mockk.runs
import io.mockk.slot import io.mockk.slot
import io.mockk.verify import io.mockk.verify
import mu.KotlinLogging import mu.KotlinLogging
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.gotson.komga.domain.model.Book import org.gotson.komga.domain.model.Book
import org.gotson.komga.domain.model.Series
import org.gotson.komga.domain.model.makeBook import org.gotson.komga.domain.model.makeBook
import org.gotson.komga.domain.model.makeSeries
import org.gotson.komga.domain.persistence.BookRepository import org.gotson.komga.domain.persistence.BookRepository
import org.gotson.komga.domain.persistence.SeriesRepository
import org.gotson.komga.domain.service.BookLifecycle import org.gotson.komga.domain.service.BookLifecycle
import org.gotson.komga.domain.service.MetadataLifecycle
import org.gotson.komga.infrastructure.jms.QUEUE_TASKS import org.gotson.komga.infrastructure.jms.QUEUE_TASKS
import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
@ -34,6 +40,12 @@ class TaskHandlerTest(
@MockkBean @MockkBean
private lateinit var mockBookLifecycle: BookLifecycle private lateinit var mockBookLifecycle: BookLifecycle
@MockkBean
private lateinit var mockMetadataLifecycle: MetadataLifecycle
@MockkBean
private lateinit var mockSeriesRepository: SeriesRepository
@MockkBean @MockkBean
private lateinit var mockBookRepository: BookRepository private lateinit var mockBookRepository: BookRepository
@ -49,16 +61,14 @@ class TaskHandlerTest(
} }
@Test @Test
fun `when similar tasks are submitted then only a few are executed`() { fun `when similar tasks are submitted then only one is executed`() {
every { mockBookRepository.findByIdOrNull(any()) } returns makeBook("id") every { mockBookRepository.findByIdOrNull(any()) } returns makeBook("id")
every { mockBookLifecycle.analyzeAndPersist(any()) } returns false every { mockBookLifecycle.analyzeAndPersist(any()) } returns false
jmsListenerEndpointRegistry.stop() jmsListenerEndpointRegistry.stop()
repeat(100) { repeat(100) {
taskReceiver.analyzeBook("id") taskReceiver.analyzeBook("id")
} }
jmsListenerEndpointRegistry.start() jmsListenerEndpointRegistry.start()
Thread.sleep(1_00) Thread.sleep(1_00)
@ -76,15 +86,38 @@ class TaskHandlerTest(
} }
every { mockBookLifecycle.analyzeAndPersist(capture(calls)) } returns false every { mockBookLifecycle.analyzeAndPersist(capture(calls)) } returns false
taskReceiver.analyzeBook("1", HIGHEST_PRIORITY) jmsListenerEndpointRegistry.stop()
taskReceiver.analyzeBook("2", LOWEST_PRIORITY) (0..9).forEach {
taskReceiver.analyzeBook("3", HIGH_PRIORITY) taskReceiver.analyzeBook("$it", it)
taskReceiver.analyzeBook("4", HIGHEST_PRIORITY) }
taskReceiver.analyzeBook("5", DEFAULT_PRIORITY) jmsListenerEndpointRegistry.start()
Thread.sleep(1_000) Thread.sleep(3_000)
verify(exactly = 5) { mockBookLifecycle.analyzeAndPersist(any()) } verify(exactly = 10) { mockBookLifecycle.analyzeAndPersist(any()) }
assertThat(calls.map { it.name }).containsExactly("1", "4", "3", "5", "2") assertThat(calls.map { it.name }).containsExactlyElementsOf((9 downTo 0).map { "$it" })
}
@Test
fun `when high priority tasks triggering tasks are submitted then they are executed first`() {
val slot = slot<String>()
val calls = mutableListOf<Series>()
every { mockSeriesRepository.findByIdOrNull(capture(slot)) } answers {
Thread.sleep(1_00)
makeSeries(slot.captured)
}
every { mockMetadataLifecycle.refreshMetadata(capture(calls)) } just runs
every { mockMetadataLifecycle.aggregateMetadata(any()) } just runs
jmsListenerEndpointRegistry.stop()
(0..9).forEach {
taskReceiver.refreshSeriesMetadata("$it", it)
}
jmsListenerEndpointRegistry.start()
Thread.sleep(5_000)
verify(exactly = 10) { mockMetadataLifecycle.refreshMetadata(any()) }
assertThat(calls.map { it.name }).containsExactlyElementsOf((9 downTo 0).map { "$it" })
} }
} }