fix: priority tasks

enable priority tasks in JMS
set high priority for all interactive tasks
prioritize book thumbnail and metadata after analysis
This commit is contained in:
Gauthier Roebroeck 2021-05-03 17:01:26 +08:00
parent a75807daf8
commit 6ee968be92
12 changed files with 118 additions and 70 deletions

View file

@ -4,23 +4,30 @@ import org.gotson.komga.domain.model.BookMetadataPatchCapability
import org.gotson.komga.domain.model.CopyMode import org.gotson.komga.domain.model.CopyMode
import java.io.Serializable import java.io.Serializable
sealed class Task : Serializable { const val HIGHEST_PRIORITY = 9
const val DEFAULT_PRIORITY = 4
sealed class Task(priority: Int = DEFAULT_PRIORITY) : Serializable {
abstract fun uniqueId(): String abstract fun uniqueId(): String
val priority = priority.coerceIn(0, 9)
data class ScanLibrary(val libraryId: String) : Task() { data class ScanLibrary(val libraryId: String) : Task() {
override fun uniqueId() = "SCAN_LIBRARY_$libraryId" override fun uniqueId() = "SCAN_LIBRARY_$libraryId"
} }
data class AnalyzeBook(val bookId: String) : Task() { class AnalyzeBook(val bookId: String, priority: Int = DEFAULT_PRIORITY) : Task(priority) {
override fun uniqueId() = "ANALYZE_BOOK_$bookId" override fun uniqueId() = "ANALYZE_BOOK_$bookId"
override fun toString(): String = "AnalyzeBook(bookId='$bookId', priority='$priority')"
} }
data class GenerateBookThumbnail(val bookId: String) : Task() { class GenerateBookThumbnail(val bookId: String, priority: Int = DEFAULT_PRIORITY) : Task(priority) {
override fun uniqueId() = "GENERATE_BOOK_THUMBNAIL_$bookId" override fun uniqueId() = "GENERATE_BOOK_THUMBNAIL_$bookId"
override fun toString(): String = "GenerateBookThumbnail(bookId='$bookId', priority='$priority')"
} }
data class RefreshBookMetadata(val bookId: String, val capabilities: List<BookMetadataPatchCapability>) : Task() { class RefreshBookMetadata(val bookId: String, val capabilities: List<BookMetadataPatchCapability>, priority: Int = DEFAULT_PRIORITY) : Task(priority) {
override fun uniqueId() = "REFRESH_BOOK_METADATA_$bookId" override fun uniqueId() = "REFRESH_BOOK_METADATA_$bookId"
override fun toString(): String = "RefreshBookMetadata(bookId='$bookId', capabilities=$capabilities, priority='$priority')"
} }
data class RefreshSeriesMetadata(val seriesId: String) : Task() { data class RefreshSeriesMetadata(val seriesId: String) : Task() {

View file

@ -45,8 +45,8 @@ class TaskHandler(
is Task.AnalyzeBook -> is Task.AnalyzeBook ->
bookRepository.findByIdOrNull(task.bookId)?.let { bookRepository.findByIdOrNull(task.bookId)?.let {
if (bookLifecycle.analyzeAndPersist(it)) { if (bookLifecycle.analyzeAndPersist(it)) {
taskReceiver.generateBookThumbnail(it.id) taskReceiver.generateBookThumbnail(it.id, priority = task.priority + 1)
taskReceiver.refreshBookMetadata(it) taskReceiver.refreshBookMetadata(it, priority = task.priority + 1)
} }
} ?: logger.warn { "Cannot execute task $task: Book does not exist" } } ?: logger.warn { "Cannot execute task $task: Book does not exist" }

View file

@ -13,6 +13,7 @@ import org.gotson.komga.infrastructure.jms.QUEUE_TASKS
import org.gotson.komga.infrastructure.jms.QUEUE_TASKS_TYPE import org.gotson.komga.infrastructure.jms.QUEUE_TASKS_TYPE
import org.gotson.komga.infrastructure.jms.QUEUE_TYPE import org.gotson.komga.infrastructure.jms.QUEUE_TYPE
import org.gotson.komga.infrastructure.jms.QUEUE_UNIQUE_ID import org.gotson.komga.infrastructure.jms.QUEUE_UNIQUE_ID
import org.springframework.data.domain.Sort
import org.springframework.jms.core.JmsTemplate import org.springframework.jms.core.JmsTemplate
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
@ -38,36 +39,39 @@ class TaskReceiver(
BookSearch( BookSearch(
libraryIds = listOf(library.id), libraryIds = listOf(library.id),
mediaStatus = listOf(Media.Status.UNKNOWN, Media.Status.OUTDATED) mediaStatus = listOf(Media.Status.UNKNOWN, Media.Status.OUTDATED)
) ),
Sort.by(Sort.Order.asc("seriesId"), Sort.Order.asc("number"))
).forEach { ).forEach {
submitTask(Task.AnalyzeBook(it)) submitTask(Task.AnalyzeBook(it))
} }
} }
fun analyzeBook(bookId: String) { fun analyzeBook(bookId: String, priority: Int = DEFAULT_PRIORITY) {
submitTask(Task.AnalyzeBook(bookId)) submitTask(Task.AnalyzeBook(bookId, priority))
} }
fun analyzeBook(book: Book) { fun analyzeBook(book: Book, priority: Int = DEFAULT_PRIORITY) {
submitTask(Task.AnalyzeBook(book.id)) submitTask(Task.AnalyzeBook(book.id, priority))
} }
fun generateBookThumbnail(bookId: String) { fun generateBookThumbnail(bookId: String, priority: Int = DEFAULT_PRIORITY) {
submitTask(Task.GenerateBookThumbnail(bookId)) submitTask(Task.GenerateBookThumbnail(bookId, priority))
} }
fun refreshBookMetadata( fun refreshBookMetadata(
bookId: String, bookId: String,
capabilities: List<BookMetadataPatchCapability> = BookMetadataPatchCapability.values().toList() capabilities: List<BookMetadataPatchCapability> = BookMetadataPatchCapability.values().toList(),
priority: Int = DEFAULT_PRIORITY,
) { ) {
submitTask(Task.RefreshBookMetadata(bookId, capabilities)) submitTask(Task.RefreshBookMetadata(bookId, capabilities, priority))
} }
fun refreshBookMetadata( fun refreshBookMetadata(
book: Book, book: Book,
capabilities: List<BookMetadataPatchCapability> = BookMetadataPatchCapability.values().toList() capabilities: List<BookMetadataPatchCapability> = BookMetadataPatchCapability.values().toList(),
priority: Int = DEFAULT_PRIORITY,
) { ) {
submitTask(Task.RefreshBookMetadata(book.id, capabilities)) submitTask(Task.RefreshBookMetadata(book.id, capabilities, priority))
} }
fun refreshSeriesMetadata(seriesId: String) { fun refreshSeriesMetadata(seriesId: String) {
@ -84,6 +88,7 @@ class TaskReceiver(
private fun submitTask(task: Task) { private fun submitTask(task: Task) {
logger.info { "Sending task: $task" } logger.info { "Sending task: $task" }
jmsTemplate.priority = task.priority
jmsTemplate.convertAndSend(QUEUE_TASKS, task) { jmsTemplate.convertAndSend(QUEUE_TASKS, task) {
it.apply { it.apply {
setStringProperty(QUEUE_TYPE, QUEUE_TASKS_TYPE) setStringProperty(QUEUE_TYPE, QUEUE_TASKS_TYPE)

View file

@ -4,6 +4,7 @@ import org.gotson.komga.domain.model.Book
import org.gotson.komga.domain.model.BookSearch import org.gotson.komga.domain.model.BookSearch
import org.springframework.data.domain.Page import org.springframework.data.domain.Page
import org.springframework.data.domain.Pageable import org.springframework.data.domain.Pageable
import org.springframework.data.domain.Sort
interface BookRepository { interface BookRepository {
fun findByIdOrNull(bookId: String): Book? fun findByIdOrNull(bookId: String): Book?
@ -17,7 +18,7 @@ interface BookRepository {
fun findAllIdBySeriesId(seriesId: String): Collection<String> fun findAllIdBySeriesId(seriesId: String): Collection<String>
fun findAllIdBySeriesIds(seriesIds: Collection<String>): Collection<String> fun findAllIdBySeriesIds(seriesIds: Collection<String>): Collection<String>
fun findAllIdByLibraryId(libraryId: String): Collection<String> fun findAllIdByLibraryId(libraryId: String): Collection<String>
fun findAllId(bookSearch: BookSearch): Collection<String> fun findAllId(bookSearch: BookSearch, sort: Sort): Collection<String>
fun insert(book: Book) fun insert(book: Book)
fun insertMany(books: Collection<Book>) fun insertMany(books: Collection<Book>)

View file

@ -1,6 +1,7 @@
package org.gotson.komga.domain.service package org.gotson.komga.domain.service
import mu.KotlinLogging import mu.KotlinLogging
import org.gotson.komga.application.tasks.HIGHEST_PRIORITY
import org.gotson.komga.application.tasks.TaskReceiver import org.gotson.komga.application.tasks.TaskReceiver
import org.gotson.komga.domain.model.CopyMode import org.gotson.komga.domain.model.CopyMode
import org.gotson.komga.domain.model.Media import org.gotson.komga.domain.model.Media
@ -147,6 +148,6 @@ class BookImporter(
seriesLifecycle.sortBooks(series) seriesLifecycle.sortBooks(series)
taskReceiver.analyzeBook(importedBook) taskReceiver.analyzeBook(importedBook, HIGHEST_PRIORITY)
} }
} }

View file

@ -28,7 +28,9 @@ class BookDao(
private val d = Tables.BOOK_METADATA private val d = Tables.BOOK_METADATA
private val sorts = mapOf( private val sorts = mapOf(
"createdDate" to b.CREATED_DATE "createdDate" to b.CREATED_DATE,
"seriesId" to b.SERIES_ID,
"number" to b.NUMBER,
) )
override fun findByIdOrNull(bookId: String): Book? = override fun findByIdOrNull(bookId: String): Book? =
@ -125,14 +127,17 @@ class BookDao(
.where(b.LIBRARY_ID.eq(libraryId)) .where(b.LIBRARY_ID.eq(libraryId))
.fetch(0, String::class.java) .fetch(0, String::class.java)
override fun findAllId(bookSearch: BookSearch): Collection<String> { override fun findAllId(bookSearch: BookSearch, sort: Sort): Collection<String> {
val conditions = bookSearch.toCondition() val conditions = bookSearch.toCondition()
val orderBy = sort.toOrderBy(sorts)
return dsl.select(b.ID) return dsl.select(b.ID)
.from(b) .from(b)
.leftJoin(m).on(b.ID.eq(m.BOOK_ID)) .leftJoin(m).on(b.ID.eq(m.BOOK_ID))
.leftJoin(d).on(b.ID.eq(d.BOOK_ID)) .leftJoin(d).on(b.ID.eq(d.BOOK_ID))
.where(conditions) .where(conditions)
.orderBy(orderBy)
.fetch(0, String::class.java) .fetch(0, String::class.java)
} }

View file

@ -7,6 +7,7 @@ import io.swagger.v3.oas.annotations.media.Schema
import io.swagger.v3.oas.annotations.responses.ApiResponse import io.swagger.v3.oas.annotations.responses.ApiResponse
import mu.KotlinLogging import mu.KotlinLogging
import org.apache.commons.io.IOUtils import org.apache.commons.io.IOUtils
import org.gotson.komga.application.tasks.HIGHEST_PRIORITY
import org.gotson.komga.application.tasks.TaskReceiver import org.gotson.komga.application.tasks.TaskReceiver
import org.gotson.komga.domain.model.Author import org.gotson.komga.domain.model.Author
import org.gotson.komga.domain.model.BookSearchWithReadProgress import org.gotson.komga.domain.model.BookSearchWithReadProgress
@ -418,7 +419,7 @@ class BookController(
@ResponseStatus(HttpStatus.ACCEPTED) @ResponseStatus(HttpStatus.ACCEPTED)
fun analyze(@PathVariable bookId: String) { fun analyze(@PathVariable bookId: String) {
bookRepository.findByIdOrNull(bookId)?.let { book -> bookRepository.findByIdOrNull(bookId)?.let { book ->
taskReceiver.analyzeBook(book) taskReceiver.analyzeBook(book, HIGHEST_PRIORITY)
} ?: throw ResponseStatusException(HttpStatus.NOT_FOUND) } ?: throw ResponseStatusException(HttpStatus.NOT_FOUND)
} }
@ -427,7 +428,7 @@ class BookController(
@ResponseStatus(HttpStatus.ACCEPTED) @ResponseStatus(HttpStatus.ACCEPTED)
fun refreshMetadata(@PathVariable bookId: String) { fun refreshMetadata(@PathVariable bookId: String) {
bookRepository.findByIdOrNull(bookId)?.let { book -> bookRepository.findByIdOrNull(bookId)?.let { book ->
taskReceiver.refreshBookMetadata(book) taskReceiver.refreshBookMetadata(book, priority = HIGHEST_PRIORITY)
} ?: throw ResponseStatusException(HttpStatus.NOT_FOUND) } ?: throw ResponseStatusException(HttpStatus.NOT_FOUND)
} }

View file

@ -1,6 +1,7 @@
package org.gotson.komga.interfaces.rest package org.gotson.komga.interfaces.rest
import mu.KotlinLogging import mu.KotlinLogging
import org.gotson.komga.application.tasks.HIGHEST_PRIORITY
import org.gotson.komga.application.tasks.TaskReceiver import org.gotson.komga.application.tasks.TaskReceiver
import org.gotson.komga.domain.model.DirectoryNotFoundException import org.gotson.komga.domain.model.DirectoryNotFoundException
import org.gotson.komga.domain.model.DuplicateNameException import org.gotson.komga.domain.model.DuplicateNameException
@ -146,7 +147,7 @@ class LibraryController(
@ResponseStatus(HttpStatus.ACCEPTED) @ResponseStatus(HttpStatus.ACCEPTED)
fun analyze(@PathVariable libraryId: String) { fun analyze(@PathVariable libraryId: String) {
bookRepository.findAllIdByLibraryId(libraryId).forEach { bookRepository.findAllIdByLibraryId(libraryId).forEach {
taskReceiver.analyzeBook(it) taskReceiver.analyzeBook(it, HIGHEST_PRIORITY)
} }
} }
@ -155,7 +156,7 @@ class LibraryController(
@ResponseStatus(HttpStatus.ACCEPTED) @ResponseStatus(HttpStatus.ACCEPTED)
fun refreshMetadata(@PathVariable libraryId: String) { fun refreshMetadata(@PathVariable libraryId: String) {
bookRepository.findAllIdByLibraryId(libraryId).forEach { bookRepository.findAllIdByLibraryId(libraryId).forEach {
taskReceiver.refreshBookMetadata(it) taskReceiver.refreshBookMetadata(it, priority = HIGHEST_PRIORITY)
} }
} }
} }

View file

@ -9,6 +9,7 @@ import mu.KotlinLogging
import org.apache.commons.compress.archivers.zip.ZipArchiveEntry import org.apache.commons.compress.archivers.zip.ZipArchiveEntry
import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream
import org.apache.commons.io.IOUtils import org.apache.commons.io.IOUtils
import org.gotson.komga.application.tasks.HIGHEST_PRIORITY
import org.gotson.komga.application.tasks.TaskReceiver import org.gotson.komga.application.tasks.TaskReceiver
import org.gotson.komga.domain.model.Author import org.gotson.komga.domain.model.Author
import org.gotson.komga.domain.model.BookSearchWithReadProgress import org.gotson.komga.domain.model.BookSearchWithReadProgress
@ -293,7 +294,7 @@ class SeriesController(
@ResponseStatus(HttpStatus.ACCEPTED) @ResponseStatus(HttpStatus.ACCEPTED)
fun analyze(@PathVariable seriesId: String) { fun analyze(@PathVariable seriesId: String) {
bookRepository.findAllIdBySeriesId(seriesId).forEach { bookRepository.findAllIdBySeriesId(seriesId).forEach {
taskReceiver.analyzeBook(it) taskReceiver.analyzeBook(it, HIGHEST_PRIORITY)
} }
} }
@ -302,7 +303,7 @@ class SeriesController(
@ResponseStatus(HttpStatus.ACCEPTED) @ResponseStatus(HttpStatus.ACCEPTED)
fun refreshMetadata(@PathVariable seriesId: String) { fun refreshMetadata(@PathVariable seriesId: String) {
bookRepository.findAllIdBySeriesId(seriesId).forEach { bookRepository.findAllIdBySeriesId(seriesId).forEach {
taskReceiver.refreshBookMetadata(it) taskReceiver.refreshBookMetadata(it, priority = HIGHEST_PRIORITY)
} }
} }

View file

@ -33,6 +33,7 @@ spring:
web: web:
resources: resources:
add-mappings: false add-mappings: false
jms.template.qos-enabled: true
server: server:
servlet.session.timeout: 7d servlet.session.timeout: 7d

View file

@ -2,26 +2,21 @@ package org.gotson.komga.application.tasks
import com.ninjasquad.springmockk.MockkBean import com.ninjasquad.springmockk.MockkBean
import io.mockk.every import io.mockk.every
import io.mockk.just import io.mockk.slot
import io.mockk.runs
import io.mockk.verify import io.mockk.verify
import mu.KotlinLogging import mu.KotlinLogging
import org.assertj.core.api.Assertions.assertThat
import org.gotson.komga.domain.model.Book
import org.gotson.komga.domain.model.makeBook import org.gotson.komga.domain.model.makeBook
import org.gotson.komga.domain.model.makeLibrary
import org.gotson.komga.domain.model.makeSeries
import org.gotson.komga.domain.persistence.BookRepository import org.gotson.komga.domain.persistence.BookRepository
import org.gotson.komga.domain.persistence.LibraryRepository import org.gotson.komga.domain.service.BookLifecycle
import org.gotson.komga.domain.service.LibraryLifecycle
import org.gotson.komga.domain.service.MetadataLifecycle
import org.gotson.komga.domain.service.SeriesLifecycle
import org.gotson.komga.infrastructure.jms.QUEUE_TASKS import org.gotson.komga.infrastructure.jms.QUEUE_TASKS
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith import org.junit.jupiter.api.extension.ExtendWith
import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest import org.springframework.boot.test.context.SpringBootTest
import org.springframework.jms.config.JmsListenerEndpointRegistry
import org.springframework.jms.core.JmsTemplate import org.springframework.jms.core.JmsTemplate
import org.springframework.jms.support.destination.JmsDestinationAccessor import org.springframework.jms.support.destination.JmsDestinationAccessor
import org.springframework.test.context.junit.jupiter.SpringExtension import org.springframework.test.context.junit.jupiter.SpringExtension
@ -33,36 +28,19 @@ private val logger = KotlinLogging.logger {}
class TaskHandlerTest( class TaskHandlerTest(
@Autowired private val taskReceiver: TaskReceiver, @Autowired private val taskReceiver: TaskReceiver,
@Autowired private val jmsTemplate: JmsTemplate, @Autowired private val jmsTemplate: JmsTemplate,
@Autowired private val libraryRepository: LibraryRepository, @Autowired private val jmsListenerEndpointRegistry: JmsListenerEndpointRegistry,
@Autowired private val bookRepository: BookRepository,
@Autowired private val seriesLifecycle: SeriesLifecycle,
@Autowired private val libraryLifecycle: LibraryLifecycle
) { ) {
@MockkBean @MockkBean
private lateinit var mockMetadataLifecycle: MetadataLifecycle private lateinit var mockBookLifecycle: BookLifecycle
private val library = makeLibrary() @MockkBean
private lateinit var mockBookRepository: BookRepository
init { init {
jmsTemplate.receiveTimeout = JmsDestinationAccessor.RECEIVE_TIMEOUT_NO_WAIT jmsTemplate.receiveTimeout = JmsDestinationAccessor.RECEIVE_TIMEOUT_NO_WAIT
} }
@BeforeAll
fun `setup library`() {
libraryRepository.insert(library)
}
@AfterAll
fun `teardown library`() {
libraryRepository.deleteAll()
}
@AfterEach
fun `clear repository`() {
libraryLifecycle.deleteLibrary(library)
}
@AfterEach @AfterEach
fun emptyQueue() { fun emptyQueue() {
while (jmsTemplate.receive(QUEUE_TASKS) != null) { while (jmsTemplate.receive(QUEUE_TASKS) != null) {
@ -72,23 +50,39 @@ class TaskHandlerTest(
@Test @Test
fun `when similar tasks are submitted then only a few are executed`() { fun `when similar tasks are submitted then only a few are executed`() {
val book = makeBook("book", libraryId = library.id) every { mockBookRepository.findByIdOrNull(any()) } returns makeBook("id")
val series = makeSeries("series", libraryId = library.id) every { mockBookLifecycle.analyzeAndPersist(any()) } returns false
seriesLifecycle.createSeries(series).let {
seriesLifecycle.addBooks(it, listOf(book))
}
every { mockMetadataLifecycle.refreshMetadata(any(), any()) } answers { Thread.sleep(1_000) } jmsListenerEndpointRegistry.stop()
every { mockMetadataLifecycle.refreshMetadata(any()) } just runs
val createdBook = bookRepository.findAll().first()
repeat(100) { repeat(100) {
taskReceiver.refreshBookMetadata(createdBook) taskReceiver.analyzeBook("id")
} }
Thread.sleep(5_000) jmsListenerEndpointRegistry.start()
verify(atLeast = 1, atMost = 3) { mockMetadataLifecycle.refreshMetadata(any(), any()) } Thread.sleep(1_00)
verify(exactly = 1) { mockBookLifecycle.analyzeAndPersist(any()) }
}
@Test
fun `when high priority tasks are submitted then they are executed first`() {
val slot = slot<String>()
val calls = mutableListOf<Book>()
every { mockBookRepository.findByIdOrNull(capture(slot)) } answers { makeBook(slot.captured) }
every { mockBookLifecycle.analyzeAndPersist(capture(calls)) } returns false
jmsListenerEndpointRegistry.stop()
taskReceiver.analyzeBook("1")
taskReceiver.analyzeBook("2", HIGHEST_PRIORITY)
jmsListenerEndpointRegistry.start()
Thread.sleep(1_00)
verify(exactly = 2) { mockBookLifecycle.analyzeAndPersist(any()) }
assertThat(calls.map { it.name }).containsExactly("2", "1")
} }
} }

View file

@ -92,4 +92,35 @@ class ArtemisConfigTest(
assertThat(msg).isEqualTo("message 1") assertThat(msg).isEqualTo("message 1")
assertThat(size).isEqualTo(5) assertThat(size).isEqualTo(5)
} }
@Test
fun `when sending messages with different priority then high priority messages are received first`() {
for (i in 0..9) {
jmsTemplate.priority = i
jmsTemplate.convertAndSend(
QUEUE_TASKS,
"message A $i"
)
}
for (i in 9 downTo 0) {
jmsTemplate.priority = i
jmsTemplate.convertAndSend(
QUEUE_TASKS,
"message B $i"
)
}
val size = jmsTemplate.browse(QUEUE_TASKS) { _: Session, browser: QueueBrowser ->
browser.enumeration.toList().size
}
assertThat(size).isEqualTo(20)
for (i in 9 downTo 0) {
val msgA = jmsTemplate.receiveAndConvert(QUEUE_TASKS) as String
assertThat(msgA).isEqualTo("message A $i")
val msgB = jmsTemplate.receiveAndConvert(QUEUE_TASKS) as String
assertThat(msgB).isEqualTo("message B $i")
}
}
} }