fix: priority tasks

enable priority tasks in JMS
set high priority for all interactive tasks
prioritize book thumbnail and metadata after analysis
This commit is contained in:
Gauthier Roebroeck 2021-05-03 17:01:26 +08:00
parent a75807daf8
commit 6ee968be92
12 changed files with 118 additions and 70 deletions

View file

@ -4,23 +4,30 @@ import org.gotson.komga.domain.model.BookMetadataPatchCapability
import org.gotson.komga.domain.model.CopyMode
import java.io.Serializable
sealed class Task : Serializable {
const val HIGHEST_PRIORITY = 9
const val DEFAULT_PRIORITY = 4
sealed class Task(priority: Int = DEFAULT_PRIORITY) : Serializable {
abstract fun uniqueId(): String
val priority = priority.coerceIn(0, 9)
data class ScanLibrary(val libraryId: String) : Task() {
override fun uniqueId() = "SCAN_LIBRARY_$libraryId"
}
data class AnalyzeBook(val bookId: String) : Task() {
class AnalyzeBook(val bookId: String, priority: Int = DEFAULT_PRIORITY) : Task(priority) {
override fun uniqueId() = "ANALYZE_BOOK_$bookId"
override fun toString(): String = "AnalyzeBook(bookId='$bookId', priority='$priority')"
}
data class GenerateBookThumbnail(val bookId: String) : Task() {
class GenerateBookThumbnail(val bookId: String, priority: Int = DEFAULT_PRIORITY) : Task(priority) {
override fun uniqueId() = "GENERATE_BOOK_THUMBNAIL_$bookId"
override fun toString(): String = "GenerateBookThumbnail(bookId='$bookId', priority='$priority')"
}
data class RefreshBookMetadata(val bookId: String, val capabilities: List<BookMetadataPatchCapability>) : Task() {
class RefreshBookMetadata(val bookId: String, val capabilities: List<BookMetadataPatchCapability>, priority: Int = DEFAULT_PRIORITY) : Task(priority) {
override fun uniqueId() = "REFRESH_BOOK_METADATA_$bookId"
override fun toString(): String = "RefreshBookMetadata(bookId='$bookId', capabilities=$capabilities, priority='$priority')"
}
data class RefreshSeriesMetadata(val seriesId: String) : Task() {

View file

@ -45,8 +45,8 @@ class TaskHandler(
is Task.AnalyzeBook ->
bookRepository.findByIdOrNull(task.bookId)?.let {
if (bookLifecycle.analyzeAndPersist(it)) {
taskReceiver.generateBookThumbnail(it.id)
taskReceiver.refreshBookMetadata(it)
taskReceiver.generateBookThumbnail(it.id, priority = task.priority + 1)
taskReceiver.refreshBookMetadata(it, priority = task.priority + 1)
}
} ?: logger.warn { "Cannot execute task $task: Book does not exist" }

View file

@ -13,6 +13,7 @@ import org.gotson.komga.infrastructure.jms.QUEUE_TASKS
import org.gotson.komga.infrastructure.jms.QUEUE_TASKS_TYPE
import org.gotson.komga.infrastructure.jms.QUEUE_TYPE
import org.gotson.komga.infrastructure.jms.QUEUE_UNIQUE_ID
import org.springframework.data.domain.Sort
import org.springframework.jms.core.JmsTemplate
import org.springframework.stereotype.Service
@ -38,36 +39,39 @@ class TaskReceiver(
BookSearch(
libraryIds = listOf(library.id),
mediaStatus = listOf(Media.Status.UNKNOWN, Media.Status.OUTDATED)
)
),
Sort.by(Sort.Order.asc("seriesId"), Sort.Order.asc("number"))
).forEach {
submitTask(Task.AnalyzeBook(it))
}
}
fun analyzeBook(bookId: String) {
submitTask(Task.AnalyzeBook(bookId))
fun analyzeBook(bookId: String, priority: Int = DEFAULT_PRIORITY) {
submitTask(Task.AnalyzeBook(bookId, priority))
}
fun analyzeBook(book: Book) {
submitTask(Task.AnalyzeBook(book.id))
fun analyzeBook(book: Book, priority: Int = DEFAULT_PRIORITY) {
submitTask(Task.AnalyzeBook(book.id, priority))
}
fun generateBookThumbnail(bookId: String) {
submitTask(Task.GenerateBookThumbnail(bookId))
fun generateBookThumbnail(bookId: String, priority: Int = DEFAULT_PRIORITY) {
submitTask(Task.GenerateBookThumbnail(bookId, priority))
}
fun refreshBookMetadata(
bookId: String,
capabilities: List<BookMetadataPatchCapability> = BookMetadataPatchCapability.values().toList()
capabilities: List<BookMetadataPatchCapability> = BookMetadataPatchCapability.values().toList(),
priority: Int = DEFAULT_PRIORITY,
) {
submitTask(Task.RefreshBookMetadata(bookId, capabilities))
submitTask(Task.RefreshBookMetadata(bookId, capabilities, priority))
}
fun refreshBookMetadata(
book: Book,
capabilities: List<BookMetadataPatchCapability> = BookMetadataPatchCapability.values().toList()
capabilities: List<BookMetadataPatchCapability> = BookMetadataPatchCapability.values().toList(),
priority: Int = DEFAULT_PRIORITY,
) {
submitTask(Task.RefreshBookMetadata(book.id, capabilities))
submitTask(Task.RefreshBookMetadata(book.id, capabilities, priority))
}
fun refreshSeriesMetadata(seriesId: String) {
@ -84,6 +88,7 @@ class TaskReceiver(
private fun submitTask(task: Task) {
logger.info { "Sending task: $task" }
jmsTemplate.priority = task.priority
jmsTemplate.convertAndSend(QUEUE_TASKS, task) {
it.apply {
setStringProperty(QUEUE_TYPE, QUEUE_TASKS_TYPE)

View file

@ -4,6 +4,7 @@ import org.gotson.komga.domain.model.Book
import org.gotson.komga.domain.model.BookSearch
import org.springframework.data.domain.Page
import org.springframework.data.domain.Pageable
import org.springframework.data.domain.Sort
interface BookRepository {
fun findByIdOrNull(bookId: String): Book?
@ -17,7 +18,7 @@ interface BookRepository {
fun findAllIdBySeriesId(seriesId: String): Collection<String>
fun findAllIdBySeriesIds(seriesIds: Collection<String>): Collection<String>
fun findAllIdByLibraryId(libraryId: String): Collection<String>
fun findAllId(bookSearch: BookSearch): Collection<String>
fun findAllId(bookSearch: BookSearch, sort: Sort): Collection<String>
fun insert(book: Book)
fun insertMany(books: Collection<Book>)

View file

@ -1,6 +1,7 @@
package org.gotson.komga.domain.service
import mu.KotlinLogging
import org.gotson.komga.application.tasks.HIGHEST_PRIORITY
import org.gotson.komga.application.tasks.TaskReceiver
import org.gotson.komga.domain.model.CopyMode
import org.gotson.komga.domain.model.Media
@ -147,6 +148,6 @@ class BookImporter(
seriesLifecycle.sortBooks(series)
taskReceiver.analyzeBook(importedBook)
taskReceiver.analyzeBook(importedBook, HIGHEST_PRIORITY)
}
}

View file

@ -28,7 +28,9 @@ class BookDao(
private val d = Tables.BOOK_METADATA
private val sorts = mapOf(
"createdDate" to b.CREATED_DATE
"createdDate" to b.CREATED_DATE,
"seriesId" to b.SERIES_ID,
"number" to b.NUMBER,
)
override fun findByIdOrNull(bookId: String): Book? =
@ -125,14 +127,17 @@ class BookDao(
.where(b.LIBRARY_ID.eq(libraryId))
.fetch(0, String::class.java)
override fun findAllId(bookSearch: BookSearch): Collection<String> {
override fun findAllId(bookSearch: BookSearch, sort: Sort): Collection<String> {
val conditions = bookSearch.toCondition()
val orderBy = sort.toOrderBy(sorts)
return dsl.select(b.ID)
.from(b)
.leftJoin(m).on(b.ID.eq(m.BOOK_ID))
.leftJoin(d).on(b.ID.eq(d.BOOK_ID))
.where(conditions)
.orderBy(orderBy)
.fetch(0, String::class.java)
}

View file

@ -7,6 +7,7 @@ import io.swagger.v3.oas.annotations.media.Schema
import io.swagger.v3.oas.annotations.responses.ApiResponse
import mu.KotlinLogging
import org.apache.commons.io.IOUtils
import org.gotson.komga.application.tasks.HIGHEST_PRIORITY
import org.gotson.komga.application.tasks.TaskReceiver
import org.gotson.komga.domain.model.Author
import org.gotson.komga.domain.model.BookSearchWithReadProgress
@ -418,7 +419,7 @@ class BookController(
@ResponseStatus(HttpStatus.ACCEPTED)
fun analyze(@PathVariable bookId: String) {
bookRepository.findByIdOrNull(bookId)?.let { book ->
taskReceiver.analyzeBook(book)
taskReceiver.analyzeBook(book, HIGHEST_PRIORITY)
} ?: throw ResponseStatusException(HttpStatus.NOT_FOUND)
}
@ -427,7 +428,7 @@ class BookController(
@ResponseStatus(HttpStatus.ACCEPTED)
fun refreshMetadata(@PathVariable bookId: String) {
bookRepository.findByIdOrNull(bookId)?.let { book ->
taskReceiver.refreshBookMetadata(book)
taskReceiver.refreshBookMetadata(book, priority = HIGHEST_PRIORITY)
} ?: throw ResponseStatusException(HttpStatus.NOT_FOUND)
}

View file

@ -1,6 +1,7 @@
package org.gotson.komga.interfaces.rest
import mu.KotlinLogging
import org.gotson.komga.application.tasks.HIGHEST_PRIORITY
import org.gotson.komga.application.tasks.TaskReceiver
import org.gotson.komga.domain.model.DirectoryNotFoundException
import org.gotson.komga.domain.model.DuplicateNameException
@ -146,7 +147,7 @@ class LibraryController(
@ResponseStatus(HttpStatus.ACCEPTED)
fun analyze(@PathVariable libraryId: String) {
bookRepository.findAllIdByLibraryId(libraryId).forEach {
taskReceiver.analyzeBook(it)
taskReceiver.analyzeBook(it, HIGHEST_PRIORITY)
}
}
@ -155,7 +156,7 @@ class LibraryController(
@ResponseStatus(HttpStatus.ACCEPTED)
fun refreshMetadata(@PathVariable libraryId: String) {
bookRepository.findAllIdByLibraryId(libraryId).forEach {
taskReceiver.refreshBookMetadata(it)
taskReceiver.refreshBookMetadata(it, priority = HIGHEST_PRIORITY)
}
}
}

View file

@ -9,6 +9,7 @@ import mu.KotlinLogging
import org.apache.commons.compress.archivers.zip.ZipArchiveEntry
import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream
import org.apache.commons.io.IOUtils
import org.gotson.komga.application.tasks.HIGHEST_PRIORITY
import org.gotson.komga.application.tasks.TaskReceiver
import org.gotson.komga.domain.model.Author
import org.gotson.komga.domain.model.BookSearchWithReadProgress
@ -293,7 +294,7 @@ class SeriesController(
@ResponseStatus(HttpStatus.ACCEPTED)
fun analyze(@PathVariable seriesId: String) {
bookRepository.findAllIdBySeriesId(seriesId).forEach {
taskReceiver.analyzeBook(it)
taskReceiver.analyzeBook(it, HIGHEST_PRIORITY)
}
}
@ -302,7 +303,7 @@ class SeriesController(
@ResponseStatus(HttpStatus.ACCEPTED)
fun refreshMetadata(@PathVariable seriesId: String) {
bookRepository.findAllIdBySeriesId(seriesId).forEach {
taskReceiver.refreshBookMetadata(it)
taskReceiver.refreshBookMetadata(it, priority = HIGHEST_PRIORITY)
}
}

View file

@ -33,6 +33,7 @@ spring:
web:
resources:
add-mappings: false
jms.template.qos-enabled: true
server:
servlet.session.timeout: 7d

View file

@ -2,26 +2,21 @@ package org.gotson.komga.application.tasks
import com.ninjasquad.springmockk.MockkBean
import io.mockk.every
import io.mockk.just
import io.mockk.runs
import io.mockk.slot
import io.mockk.verify
import mu.KotlinLogging
import org.assertj.core.api.Assertions.assertThat
import org.gotson.komga.domain.model.Book
import org.gotson.komga.domain.model.makeBook
import org.gotson.komga.domain.model.makeLibrary
import org.gotson.komga.domain.model.makeSeries
import org.gotson.komga.domain.persistence.BookRepository
import org.gotson.komga.domain.persistence.LibraryRepository
import org.gotson.komga.domain.service.LibraryLifecycle
import org.gotson.komga.domain.service.MetadataLifecycle
import org.gotson.komga.domain.service.SeriesLifecycle
import org.gotson.komga.domain.service.BookLifecycle
import org.gotson.komga.infrastructure.jms.QUEUE_TASKS
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.jms.config.JmsListenerEndpointRegistry
import org.springframework.jms.core.JmsTemplate
import org.springframework.jms.support.destination.JmsDestinationAccessor
import org.springframework.test.context.junit.jupiter.SpringExtension
@ -33,36 +28,19 @@ private val logger = KotlinLogging.logger {}
class TaskHandlerTest(
@Autowired private val taskReceiver: TaskReceiver,
@Autowired private val jmsTemplate: JmsTemplate,
@Autowired private val libraryRepository: LibraryRepository,
@Autowired private val bookRepository: BookRepository,
@Autowired private val seriesLifecycle: SeriesLifecycle,
@Autowired private val libraryLifecycle: LibraryLifecycle
@Autowired private val jmsListenerEndpointRegistry: JmsListenerEndpointRegistry,
) {
@MockkBean
private lateinit var mockMetadataLifecycle: MetadataLifecycle
private lateinit var mockBookLifecycle: BookLifecycle
private val library = makeLibrary()
@MockkBean
private lateinit var mockBookRepository: BookRepository
init {
jmsTemplate.receiveTimeout = JmsDestinationAccessor.RECEIVE_TIMEOUT_NO_WAIT
}
@BeforeAll
fun `setup library`() {
libraryRepository.insert(library)
}
@AfterAll
fun `teardown library`() {
libraryRepository.deleteAll()
}
@AfterEach
fun `clear repository`() {
libraryLifecycle.deleteLibrary(library)
}
@AfterEach
fun emptyQueue() {
while (jmsTemplate.receive(QUEUE_TASKS) != null) {
@ -72,23 +50,39 @@ class TaskHandlerTest(
@Test
fun `when similar tasks are submitted then only a few are executed`() {
val book = makeBook("book", libraryId = library.id)
val series = makeSeries("series", libraryId = library.id)
seriesLifecycle.createSeries(series).let {
seriesLifecycle.addBooks(it, listOf(book))
}
every { mockBookRepository.findByIdOrNull(any()) } returns makeBook("id")
every { mockBookLifecycle.analyzeAndPersist(any()) } returns false
every { mockMetadataLifecycle.refreshMetadata(any(), any()) } answers { Thread.sleep(1_000) }
every { mockMetadataLifecycle.refreshMetadata(any()) } just runs
val createdBook = bookRepository.findAll().first()
jmsListenerEndpointRegistry.stop()
repeat(100) {
taskReceiver.refreshBookMetadata(createdBook)
taskReceiver.analyzeBook("id")
}
Thread.sleep(5_000)
jmsListenerEndpointRegistry.start()
verify(atLeast = 1, atMost = 3) { mockMetadataLifecycle.refreshMetadata(any(), any()) }
Thread.sleep(1_00)
verify(exactly = 1) { mockBookLifecycle.analyzeAndPersist(any()) }
}
@Test
fun `when high priority tasks are submitted then they are executed first`() {
val slot = slot<String>()
val calls = mutableListOf<Book>()
every { mockBookRepository.findByIdOrNull(capture(slot)) } answers { makeBook(slot.captured) }
every { mockBookLifecycle.analyzeAndPersist(capture(calls)) } returns false
jmsListenerEndpointRegistry.stop()
taskReceiver.analyzeBook("1")
taskReceiver.analyzeBook("2", HIGHEST_PRIORITY)
jmsListenerEndpointRegistry.start()
Thread.sleep(1_00)
verify(exactly = 2) { mockBookLifecycle.analyzeAndPersist(any()) }
assertThat(calls.map { it.name }).containsExactly("2", "1")
}
}

View file

@ -92,4 +92,35 @@ class ArtemisConfigTest(
assertThat(msg).isEqualTo("message 1")
assertThat(size).isEqualTo(5)
}
@Test
fun `when sending messages with different priority then high priority messages are received first`() {
for (i in 0..9) {
jmsTemplate.priority = i
jmsTemplate.convertAndSend(
QUEUE_TASKS,
"message A $i"
)
}
for (i in 9 downTo 0) {
jmsTemplate.priority = i
jmsTemplate.convertAndSend(
QUEUE_TASKS,
"message B $i"
)
}
val size = jmsTemplate.browse(QUEUE_TASKS) { _: Session, browser: QueueBrowser ->
browser.enumeration.toList().size
}
assertThat(size).isEqualTo(20)
for (i in 9 downTo 0) {
val msgA = jmsTemplate.receiveAndConvert(QUEUE_TASKS) as String
assertThat(msgA).isEqualTo("message A $i")
val msgB = jmsTemplate.receiveAndConvert(QUEUE_TASKS) as String
assertThat(msgB).isEqualTo("message B $i")
}
}
}