feat(tasks): replace background tasks management

Use of Apache Artemis message queue instead async methods with executors.
Tasks are submitted to TaskReceiver, and handled by TaskHandler.
The Artemis queue is configured with last-value, which automatically remove duplicate tasks in the queue.
This commit is contained in:
Gauthier Roebroeck 2020-04-24 15:02:09 +08:00
parent e7f4e203fb
commit 60ce87a25d
21 changed files with 388 additions and 361 deletions

View file

@ -38,9 +38,12 @@ dependencies {
implementation("org.springframework.boot:spring-boot-starter-cache") implementation("org.springframework.boot:spring-boot-starter-cache")
implementation("org.springframework.boot:spring-boot-starter-security") implementation("org.springframework.boot:spring-boot-starter-security")
implementation("org.springframework.boot:spring-boot-starter-thymeleaf") implementation("org.springframework.boot:spring-boot-starter-thymeleaf")
implementation("org.springframework.boot:spring-boot-starter-artemis")
kapt("org.springframework.boot:spring-boot-configuration-processor") kapt("org.springframework.boot:spring-boot-configuration-processor")
implementation("org.apache.activemq:artemis-jms-server")
implementation("org.flywaydb:flyway-core") implementation("org.flywaydb:flyway-core")
implementation("org.hibernate:hibernate-jcache") implementation("org.hibernate:hibernate-jcache")
@ -93,6 +96,7 @@ dependencies {
testImplementation("com.tngtech.archunit:archunit-junit5:0.13.1") testImplementation("com.tngtech.archunit:archunit-junit5:0.13.1")
developmentOnly("org.springframework.boot:spring-boot-devtools") developmentOnly("org.springframework.boot:spring-boot-devtools")
} }
@ -100,7 +104,7 @@ tasks {
withType<KotlinCompile> { withType<KotlinCompile> {
kotlinOptions { kotlinOptions {
jvmTarget = "1.8" jvmTarget = "1.8"
freeCompilerArgs = listOf("-Xjsr305=strict") freeCompilerArgs = listOf("-Xjsr305=strict", "-Xopt-in=kotlin.time.ExperimentalTime")
} }
} }

View file

@ -1,89 +0,0 @@
package org.gotson.komga.application.service
import mu.KotlinLogging
import org.apache.commons.lang3.time.DurationFormatUtils
import org.gotson.komga.domain.model.Book
import org.gotson.komga.domain.model.Library
import org.gotson.komga.domain.persistence.BookRepository
import org.gotson.komga.domain.persistence.LibraryRepository
import org.gotson.komga.domain.persistence.SeriesRepository
import org.gotson.komga.domain.service.LibraryScanner
import org.springframework.scheduling.annotation.Async
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import kotlin.system.measureTimeMillis
private val logger = KotlinLogging.logger {}
@Service
class AsyncOrchestrator(
private val libraryScanner: LibraryScanner,
private val libraryRepository: LibraryRepository,
private val bookRepository: BookRepository,
private val bookLifecycle: BookLifecycle,
private val seriesRepository: SeriesRepository,
private val metadataLifecycle: MetadataLifecycle
) {
@Async("periodicScanTaskExecutor")
fun scanAndAnalyzeAllLibraries() {
logger.info { "Starting periodic libraries scan" }
val libraries = libraryRepository.findAll()
if (libraries.isEmpty()) {
logger.info { "No libraries defined, nothing to scan" }
} else {
libraries.forEach {
libraryScanner.scanRootFolder(it)
}
logger.info { "Starting periodic book parsing" }
libraryScanner.analyzeUnknownBooks()
}
}
@Async("periodicScanTaskExecutor")
fun scanAndAnalyzeOneLibrary(library: Library) {
libraryScanner.scanRootFolder(library)
libraryScanner.analyzeUnknownBooks()
}
@Async("regenerateThumbnailsTaskExecutor")
@Transactional
fun generateThumbnails(books: List<Book>) {
val loadedBooks = bookRepository.findAllById(books.map { it.id })
var sumOfTasksTime = 0L
measureTimeMillis {
sumOfTasksTime = loadedBooks
.map { bookLifecycle.regenerateThumbnailAndPersist(it) }
.map {
try {
it.get()
} catch (ex: Exception) {
0L
}
}
.sum()
}.also {
logger.info { "Generated ${loadedBooks.size} thumbnails in ${DurationFormatUtils.formatDurationHMS(it)} (virtual: ${DurationFormatUtils.formatDurationHMS(sumOfTasksTime)})" }
}
}
@Async("reAnalyzeBooksTaskExecutor")
@Transactional
fun reAnalyzeBooks(books: List<Book>) {
val loadedBooks = bookRepository.findAllById(books.map { it.id })
loadedBooks.forEach { it.media.reset() }
bookRepository.saveAll(loadedBooks)
loadedBooks.map { bookLifecycle.analyzeAndPersist(it) }
}
@Async("reRefreshMetadataTaskExecutor")
@Transactional
fun refreshBooksMetadata(books: List<Book>) {
bookRepository
.findAllById(books.map { it.id })
.forEach { metadataLifecycle.refreshMetadata(it) }
}
}

View file

@ -1,7 +1,6 @@
package org.gotson.komga.application.service package org.gotson.komga.application.service
import mu.KotlinLogging import mu.KotlinLogging
import org.apache.commons.lang3.time.DurationFormatUtils
import org.gotson.komga.domain.model.Book import org.gotson.komga.domain.model.Book
import org.gotson.komga.domain.model.BookPageContent import org.gotson.komga.domain.model.BookPageContent
import org.gotson.komga.domain.model.ImageConversionException import org.gotson.komga.domain.model.ImageConversionException
@ -11,12 +10,7 @@ import org.gotson.komga.domain.persistence.BookRepository
import org.gotson.komga.domain.service.BookAnalyzer import org.gotson.komga.domain.service.BookAnalyzer
import org.gotson.komga.infrastructure.image.ImageConverter import org.gotson.komga.infrastructure.image.ImageConverter
import org.gotson.komga.infrastructure.image.ImageType import org.gotson.komga.infrastructure.image.ImageType
import org.springframework.scheduling.annotation.Async
import org.springframework.scheduling.annotation.AsyncResult
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import java.util.concurrent.Future
import kotlin.system.measureTimeMillis
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
@ -27,11 +21,8 @@ class BookLifecycle(
private val imageConverter: ImageConverter private val imageConverter: ImageConverter
) { ) {
@Transactional fun analyzeAndPersist(book: Book) {
@Async("analyzeBookTaskExecutor")
fun analyzeAndPersist(book: Book): Future<Long> {
logger.info { "Analyze and persist book: $book" } logger.info { "Analyze and persist book: $book" }
return AsyncResult(measureTimeMillis {
try { try {
book.media = bookAnalyzer.analyze(book) book.media = bookAnalyzer.analyze(book)
} catch (ex: Exception) { } catch (ex: Exception) {
@ -39,14 +30,10 @@ class BookLifecycle(
book.media = Media(status = Media.Status.ERROR, comment = ex.message) book.media = Media(status = Media.Status.ERROR, comment = ex.message)
} }
bookRepository.save(book) bookRepository.save(book)
}.also { logger.info { "Parsing finished in ${DurationFormatUtils.formatDurationHMS(it)}" } })
} }
@Transactional fun regenerateThumbnailAndPersist(book: Book) {
@Async("analyzeBookTaskExecutor")
fun regenerateThumbnailAndPersist(book: Book): Future<Long> {
logger.info { "Regenerate thumbnail and persist book: $book" } logger.info { "Regenerate thumbnail and persist book: $book" }
return AsyncResult(measureTimeMillis {
try { try {
book.media = bookAnalyzer.regenerateThumbnail(book) book.media = bookAnalyzer.regenerateThumbnail(book)
} catch (ex: Exception) { } catch (ex: Exception) {
@ -54,7 +41,6 @@ class BookLifecycle(
book.media = Media(status = Media.Status.ERROR) book.media = Media(status = Media.Status.ERROR)
} }
bookRepository.save(book) bookRepository.save(book)
}.also { logger.info { "Thumbnail generated in ${DurationFormatUtils.formatDurationHMS(it)}" } })
} }
@Throws( @Throws(

View file

@ -1,6 +1,7 @@
package org.gotson.komga.application.service package org.gotson.komga.application.service
import mu.KotlinLogging import mu.KotlinLogging
import org.gotson.komga.application.tasks.TaskReceiver
import org.gotson.komga.domain.model.DirectoryNotFoundException import org.gotson.komga.domain.model.DirectoryNotFoundException
import org.gotson.komga.domain.model.DuplicateNameException import org.gotson.komga.domain.model.DuplicateNameException
import org.gotson.komga.domain.model.Library import org.gotson.komga.domain.model.Library
@ -12,7 +13,6 @@ import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional import org.springframework.transaction.annotation.Transactional
import java.io.FileNotFoundException import java.io.FileNotFoundException
import java.nio.file.Files import java.nio.file.Files
import java.util.concurrent.RejectedExecutionException
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
@ -21,7 +21,7 @@ class LibraryLifecycle(
private val libraryRepository: LibraryRepository, private val libraryRepository: LibraryRepository,
private val seriesRepository: SeriesRepository, private val seriesRepository: SeriesRepository,
private val userRepository: KomgaUserRepository, private val userRepository: KomgaUserRepository,
private val asyncOrchestrator: AsyncOrchestrator private val taskReceiver: TaskReceiver
) { ) {
@Throws( @Throws(
@ -49,15 +49,8 @@ class LibraryLifecycle(
throw PathContainedInPath("Library path ${library.path()} is a parent of existing library ${it.name}: ${it.path()}") throw PathContainedInPath("Library path ${library.path()} is a parent of existing library ${it.name}: ${it.path()}")
} }
libraryRepository.save(library) libraryRepository.save(library)
taskReceiver.scanLibrary(library)
logger.info { "Trying to launch a scan for the newly added library: ${library.name}" }
try {
asyncOrchestrator.scanAndAnalyzeAllLibraries()
} catch (e: RejectedExecutionException) {
logger.warn { "Another scan is already running, skipping" }
}
return library return library
} }

View file

@ -6,41 +6,28 @@ import org.gotson.komga.domain.persistence.BookRepository
import org.gotson.komga.domain.persistence.SeriesRepository import org.gotson.komga.domain.persistence.SeriesRepository
import org.gotson.komga.domain.service.MetadataApplier import org.gotson.komga.domain.service.MetadataApplier
import org.gotson.komga.infrastructure.metadata.BookMetadataProvider import org.gotson.komga.infrastructure.metadata.BookMetadataProvider
import org.gotson.komga.infrastructure.metadata.comicinfo.ComicInfoProvider
import org.springframework.data.repository.findByIdOrNull
import org.springframework.scheduling.annotation.Async
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
@Service @Service
class MetadataLifecycle( class MetadataLifecycle(
private val comicInfoProvider: ComicInfoProvider,
private val bookMetadataProviders: List<BookMetadataProvider>, private val bookMetadataProviders: List<BookMetadataProvider>,
private val metadataApplier: MetadataApplier, private val metadataApplier: MetadataApplier,
private val bookRepository: BookRepository, private val bookRepository: BookRepository,
private val seriesRepository: SeriesRepository private val seriesRepository: SeriesRepository
) { ) {
@Transactional
@Async("refreshMetadataTaskExecutor")
fun refreshMetadata(book: Book) { fun refreshMetadata(book: Book) {
logger.info { "Refresh metadata for book: $book" } logger.info { "Refresh metadata for book: $book" }
val loadedBook = bookRepository.findByIdOrNull(book.id)
loadedBook?.let { bookToPatch ->
bookMetadataProviders.forEach { bookMetadataProviders.forEach {
val patch = it.getBookMetadataFromBook(bookToPatch) it.getBookMetadataFromBook(book)?.let { bPatch ->
metadataApplier.apply(bPatch, book)
patch?.let { bPatch -> bookRepository.save(book)
metadataApplier.apply(bPatch, bookToPatch)
bookRepository.save(bookToPatch)
bPatch.series?.let { sPatch -> bPatch.series?.let { sPatch ->
metadataApplier.apply(sPatch, bookToPatch.series) metadataApplier.apply(sPatch, book.series)
seriesRepository.save(bookToPatch.series) seriesRepository.save(book.series)
}
} }
} }
} }

View file

@ -0,0 +1,23 @@
package org.gotson.komga.application.tasks
import java.io.Serializable
sealed class Task : Serializable {
abstract fun uniqueId(): String
data class ScanLibrary(val libraryId: Long) : Task() {
override fun uniqueId() = "SCAN_LIBRARY_$libraryId"
}
data class AnalyzeBook(val bookId: Long) : Task() {
override fun uniqueId() = "ANALYZE_BOOK_$bookId"
}
data class GenerateBookThumbnail(val bookId: Long) : Task() {
override fun uniqueId() = "GENERATE_BOOK_THUMBNAIL_$bookId"
}
data class RefreshBookMetadata(val bookId: Long) : Task() {
override fun uniqueId() = "REFRESH_BOOK_METADATA_$bookId"
}
}

View file

@ -0,0 +1,67 @@
package org.gotson.komga.application.tasks
import mu.KotlinLogging
import org.gotson.komga.application.service.BookLifecycle
import org.gotson.komga.application.service.MetadataLifecycle
import org.gotson.komga.domain.persistence.BookRepository
import org.gotson.komga.domain.persistence.LibraryRepository
import org.gotson.komga.domain.service.LibraryScanner
import org.gotson.komga.infrastructure.jms.QUEUE_TASKS
import org.gotson.komga.infrastructure.jms.QUEUE_TASKS_SELECTOR
import org.springframework.data.repository.findByIdOrNull
import org.springframework.jms.annotation.JmsListener
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import kotlin.time.measureTime
private val logger = KotlinLogging.logger {}
@Service
class TaskHandler(
private val taskReceiver: TaskReceiver,
private val libraryRepository: LibraryRepository,
private val bookRepository: BookRepository,
private val libraryScanner: LibraryScanner,
private val bookLifecycle: BookLifecycle,
private val metadataLifecycle: MetadataLifecycle
) {
@JmsListener(destination = QUEUE_TASKS, selector = QUEUE_TASKS_SELECTOR)
@Transactional
fun handleTask(task: Task) {
logger.info { "Executing task: $task" }
try {
measureTime {
when (task) {
is Task.ScanLibrary ->
libraryRepository.findByIdOrNull(task.libraryId)?.let {
libraryScanner.scanRootFolder(it)
taskReceiver.analyzeUnknownBooks(it)
} ?: logger.warn { "Cannot execute task $task: Library does not exist" }
is Task.AnalyzeBook ->
bookRepository.findByIdOrNull(task.bookId)?.let {
bookLifecycle.analyzeAndPersist(it)
taskReceiver.refreshBookMetadata(it)
} ?: logger.warn { "Cannot execute task $task: Book does not exist" }
is Task.GenerateBookThumbnail ->
bookRepository.findByIdOrNull(task.bookId)?.let {
bookLifecycle.regenerateThumbnailAndPersist(it)
} ?: logger.warn { "Cannot execute task $task: Book does not exist" }
is Task.RefreshBookMetadata ->
bookRepository.findByIdOrNull(task.bookId)?.let {
metadataLifecycle.refreshMetadata(it)
} ?: logger.warn { "Cannot execute task $task: Book does not exist" }
}
}.also {
logger.info { "Task $task executed in $it" }
}
} catch (e: Exception) {
logger.error(e) { "Task $task execution failed" }
}
}
}

View file

@ -0,0 +1,60 @@
package org.gotson.komga.application.tasks
import mu.KotlinLogging
import org.gotson.komga.domain.model.Book
import org.gotson.komga.domain.model.Library
import org.gotson.komga.domain.model.Media
import org.gotson.komga.domain.persistence.BookRepository
import org.gotson.komga.domain.persistence.LibraryRepository
import org.gotson.komga.infrastructure.jms.QUEUE_TASKS
import org.gotson.komga.infrastructure.jms.QUEUE_TASKS_TYPE
import org.gotson.komga.infrastructure.jms.QUEUE_TYPE
import org.gotson.komga.infrastructure.jms.QUEUE_UNIQUE_ID
import org.springframework.jms.core.JmsTemplate
import org.springframework.stereotype.Service
private val logger = KotlinLogging.logger {}
@Service
class TaskReceiver(
private val jmsTemplate: JmsTemplate,
private val libraryRepository: LibraryRepository,
private val bookRepository: BookRepository
) {
fun scanLibraries() {
libraryRepository.findAll().forEach { scanLibrary(it) }
}
fun scanLibrary(library: Library) {
submitTask(Task.ScanLibrary(library.id))
}
fun analyzeUnknownBooks(library: Library) {
bookRepository.findAllByMediaStatusAndSeriesLibrary(Media.Status.UNKNOWN, library).forEach {
submitTask(Task.AnalyzeBook(it.id))
}
}
fun analyzeBook(book: Book) {
submitTask(Task.AnalyzeBook(book.id))
}
fun generateBookThumbnail(book: Book) {
submitTask(Task.GenerateBookThumbnail(book.id))
}
fun refreshBookMetadata(book: Book) {
submitTask(Task.RefreshBookMetadata(book.id))
}
private fun submitTask(task: Task) {
logger.info { "Sending task: $task" }
jmsTemplate.convertAndSend(QUEUE_TASKS, task) {
it.apply {
setStringProperty(QUEUE_TYPE, QUEUE_TASKS_TYPE)
setStringProperty(QUEUE_UNIQUE_ID, task.uniqueId())
}
}
}
}

View file

@ -28,8 +28,9 @@ interface BookRepository : JpaRepository<Book, Long>, JpaSpecificationExecutor<B
fun findBySeriesLibraryIn(seriesLibrary: Collection<Library>, pageable: Pageable): Page<Book> fun findBySeriesLibraryIn(seriesLibrary: Collection<Library>, pageable: Pageable): Page<Book>
fun findBySeriesLibraryIn(seriesLibrary: Collection<Library>): List<Book> fun findBySeriesLibraryIn(seriesLibrary: Collection<Library>): List<Book>
fun findBySeriesLibrary(seriesLibrary: Library): List<Book>
fun findByUrl(url: URL): Book? fun findByUrl(url: URL): Book?
fun findAllByMediaStatus(status: Media.Status): List<Book> fun findAllByMediaStatusAndSeriesLibrary(status: Media.Status, library: Library): List<Book>
fun findAllByMediaThumbnailIsNull(): List<Book> fun findAllByMediaThumbnailIsNull(): List<Book>
} }

View file

@ -1,18 +1,13 @@
package org.gotson.komga.domain.service package org.gotson.komga.domain.service
import mu.KotlinLogging import mu.KotlinLogging
import org.apache.commons.lang3.time.DurationFormatUtils
import org.gotson.komga.application.service.BookLifecycle
import org.gotson.komga.application.service.MetadataLifecycle
import org.gotson.komga.domain.model.Library import org.gotson.komga.domain.model.Library
import org.gotson.komga.domain.model.Media
import org.gotson.komga.domain.persistence.BookRepository import org.gotson.komga.domain.persistence.BookRepository
import org.gotson.komga.domain.persistence.SeriesRepository import org.gotson.komga.domain.persistence.SeriesRepository
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional import org.springframework.transaction.annotation.Transactional
import java.nio.file.Paths import java.nio.file.Paths
import java.time.temporal.ChronoUnit import java.time.temporal.ChronoUnit
import kotlin.system.measureTimeMillis
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
@ -20,15 +15,12 @@ private val logger = KotlinLogging.logger {}
class LibraryScanner( class LibraryScanner(
private val fileSystemScanner: FileSystemScanner, private val fileSystemScanner: FileSystemScanner,
private val seriesRepository: SeriesRepository, private val seriesRepository: SeriesRepository,
private val bookRepository: BookRepository, private val bookRepository: BookRepository
private val bookLifecycle: BookLifecycle,
private val metadataLifecycle: MetadataLifecycle
) { ) {
@Transactional @Transactional
fun scanRootFolder(library: Library) { fun scanRootFolder(library: Library) {
logger.info { "Updating library: ${library.name}, root folder: ${library.root}" } logger.info { "Updating library: ${library.name}, root folder: ${library.root}" }
measureTimeMillis {
val scannedSeries = fileSystemScanner.scanRootFolder(Paths.get(library.root.toURI())) val scannedSeries = fileSystemScanner.scanRootFolder(Paths.get(library.root.toURI()))
// delete series that don't exist anymore // delete series that don't exist anymore
@ -74,32 +66,6 @@ class LibraryScanner(
} }
} }
} }
}.also { logger.info { "Library update finished in ${DurationFormatUtils.formatDurationHMS(it)}" } }
} }
fun analyzeUnknownBooks() {
logger.info { "Analyze all books in status: unknown" }
val booksToAnalyze = bookRepository.findAllByMediaStatus(Media.Status.UNKNOWN)
var sumOfTasksTime = 0L
measureTimeMillis {
sumOfTasksTime = booksToAnalyze
.map { bookLifecycle.analyzeAndPersist(it) }
.map {
try {
it.get()
} catch (ex: Exception) {
0L
}
}
.sum()
}.also {
logger.info { "Analyzed ${booksToAnalyze.size} books in ${DurationFormatUtils.formatDurationHMS(it)} (virtual: ${DurationFormatUtils.formatDurationHMS(sumOfTasksTime)})" }
}
logger.info { "Refresh metadata for all books analyzed" }
booksToAnalyze.forEach {
metadataLifecycle.refreshMetadata(it)
}
}
} }

View file

@ -1,55 +0,0 @@
package org.gotson.komga.infrastructure.async
import org.gotson.komga.infrastructure.configuration.KomgaProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.scheduling.annotation.EnableAsync
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import java.util.concurrent.Executor
@Configuration
@EnableAsync
class AsyncConfiguration(
private val komgaProperties: KomgaProperties
) {
@Bean("analyzeBookTaskExecutor")
fun analyzeBookTaskExecutor(): Executor =
ThreadPoolTaskExecutor().apply {
corePoolSize = komgaProperties.threads.analyzer
}
@Bean("periodicScanTaskExecutor")
fun periodicScanTaskExecutor(): Executor =
ThreadPoolTaskExecutor().apply {
corePoolSize = 1
maxPoolSize = 1
setQueueCapacity(0)
}
@Bean("regenerateThumbnailsTaskExecutor")
fun regenerateThumbnailsTaskExecutor(): Executor =
ThreadPoolTaskExecutor().apply {
corePoolSize = 1
maxPoolSize = 1
setQueueCapacity(0)
}
@Bean("reAnalyzeBooksTaskExecutor")
fun reAnalyzeBooksTaskExecutor(): Executor =
ThreadPoolTaskExecutor().apply {
corePoolSize = 1
}
@Bean("refreshMetadataTaskExecutor")
fun refreshMetadataTaskExecutor(): Executor =
ThreadPoolTaskExecutor().apply {
corePoolSize = 1
}
@Bean("reRefreshMetadataTaskExecutor")
fun reRefreshMetadataTaskExecutor(): Executor =
ThreadPoolTaskExecutor().apply {
corePoolSize = 1
}
}

View file

@ -0,0 +1,32 @@
package org.gotson.komga.infrastructure.jms
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration
import org.apache.activemq.artemis.core.settings.impl.AddressSettings
import org.springframework.boot.autoconfigure.jms.artemis.ArtemisConfigurationCustomizer
import org.springframework.context.annotation.Configuration
const val QUEUE_UNIQUE_ID = "unique_id"
const val QUEUE_TYPE = "type"
const val QUEUE_TASKS = "tasks.background"
const val QUEUE_TASKS_TYPE = "task"
const val QUEUE_TASKS_SELECTOR = "$QUEUE_TYPE = '$QUEUE_TASKS_TYPE'"
@Configuration
class ArtemisConfig : ArtemisConfigurationCustomizer {
override fun customize(configuration: org.apache.activemq.artemis.core.config.Configuration?) {
configuration?.let {
// disable prefetch, ensures messages stay in the queue and last value can have desired effect
it.addAddressesSetting(QUEUE_TASKS, AddressSettings().apply {
defaultConsumerWindowSize = 0
})
it.addQueueConfiguration(
CoreQueueConfiguration()
.setAddress(QUEUE_TASKS)
.setName(QUEUE_TASKS)
.setLastValueKey(QUEUE_UNIQUE_ID)
.setRoutingType(RoutingType.ANYCAST)
)
}
}
}

View file

@ -1,7 +1,7 @@
package org.gotson.komga.interfaces.rest package org.gotson.komga.interfaces.rest
import mu.KotlinLogging import mu.KotlinLogging
import org.gotson.komga.application.service.AsyncOrchestrator import org.gotson.komga.application.tasks.TaskReceiver
import org.gotson.komga.domain.persistence.BookRepository import org.gotson.komga.domain.persistence.BookRepository
import org.springframework.http.HttpStatus import org.springframework.http.HttpStatus
import org.springframework.security.access.prepost.PreAuthorize import org.springframework.security.access.prepost.PreAuthorize
@ -9,8 +9,6 @@ import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestMapping import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.ResponseStatus import org.springframework.web.bind.annotation.ResponseStatus
import org.springframework.web.bind.annotation.RestController import org.springframework.web.bind.annotation.RestController
import org.springframework.web.server.ResponseStatusException
import java.util.concurrent.RejectedExecutionException
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
@ -18,29 +16,21 @@ private val logger = KotlinLogging.logger {}
@RequestMapping("api/v1/admin") @RequestMapping("api/v1/admin")
@PreAuthorize("hasRole('ADMIN')") @PreAuthorize("hasRole('ADMIN')")
class AdminController( class AdminController(
private val asyncOrchestrator: AsyncOrchestrator, private val bookRepository: BookRepository,
private val bookRepository: BookRepository private val taskReceiver: TaskReceiver
) { ) {
@PostMapping("rpc/thumbnails/regenerate/all") @PostMapping("rpc/thumbnails/regenerate/all")
@ResponseStatus(HttpStatus.ACCEPTED) @ResponseStatus(HttpStatus.ACCEPTED)
fun regenerateAllThumbnails() { fun regenerateAllThumbnails() {
try {
logger.info { "Regenerate thumbnail for all books" } logger.info { "Regenerate thumbnail for all books" }
asyncOrchestrator.generateThumbnails(bookRepository.findAll()) bookRepository.findAll().forEach { taskReceiver.generateBookThumbnail(it) }
} catch (e: RejectedExecutionException) {
throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Thumbnail regeneration task is already running")
}
} }
@PostMapping("rpc/thumbnails/regenerate/missing") @PostMapping("rpc/thumbnails/regenerate/missing")
@ResponseStatus(HttpStatus.ACCEPTED) @ResponseStatus(HttpStatus.ACCEPTED)
fun regenerateMissingThumbnails() { fun regenerateMissingThumbnails() {
try {
logger.info { "Regenerate missing thumbnails" } logger.info { "Regenerate missing thumbnails" }
asyncOrchestrator.generateThumbnails(bookRepository.findAllByMediaThumbnailIsNull()) bookRepository.findAllByMediaThumbnailIsNull().forEach { taskReceiver.generateBookThumbnail(it) }
} catch (e: RejectedExecutionException) {
throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Thumbnail regeneration task is already running")
}
} }
} }

View file

@ -9,8 +9,8 @@ import io.swagger.v3.oas.annotations.media.Content
import io.swagger.v3.oas.annotations.media.Schema import io.swagger.v3.oas.annotations.media.Schema
import io.swagger.v3.oas.annotations.responses.ApiResponse import io.swagger.v3.oas.annotations.responses.ApiResponse
import mu.KotlinLogging import mu.KotlinLogging
import org.gotson.komga.application.service.AsyncOrchestrator
import org.gotson.komga.application.service.BookLifecycle import org.gotson.komga.application.service.BookLifecycle
import org.gotson.komga.application.tasks.TaskReceiver
import org.gotson.komga.domain.model.Author import org.gotson.komga.domain.model.Author
import org.gotson.komga.domain.model.Book import org.gotson.komga.domain.model.Book
import org.gotson.komga.domain.model.BookMetadata import org.gotson.komga.domain.model.BookMetadata
@ -57,7 +57,6 @@ import org.springframework.web.server.ResponseStatusException
import java.io.FileNotFoundException import java.io.FileNotFoundException
import java.nio.file.NoSuchFileException import java.nio.file.NoSuchFileException
import java.time.ZoneOffset import java.time.ZoneOffset
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import javax.persistence.criteria.JoinType import javax.persistence.criteria.JoinType
import javax.validation.Valid import javax.validation.Valid
@ -69,7 +68,7 @@ private val logger = KotlinLogging.logger {}
class BookController( class BookController(
private val bookRepository: BookRepository, private val bookRepository: BookRepository,
private val bookLifecycle: BookLifecycle, private val bookLifecycle: BookLifecycle,
private val asyncOrchestrator: AsyncOrchestrator private val taskReceiver: TaskReceiver
) { ) {
@PageableAsQueryParam @PageableAsQueryParam
@ -345,11 +344,7 @@ class BookController(
@ResponseStatus(HttpStatus.ACCEPTED) @ResponseStatus(HttpStatus.ACCEPTED)
fun analyze(@PathVariable bookId: Long) { fun analyze(@PathVariable bookId: Long) {
bookRepository.findByIdOrNull(bookId)?.let { book -> bookRepository.findByIdOrNull(bookId)?.let { book ->
try { taskReceiver.analyzeBook(book)
asyncOrchestrator.reAnalyzeBooks(listOf(book))
} catch (e: RejectedExecutionException) {
throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Another book analysis task is already running")
}
} ?: throw ResponseStatusException(HttpStatus.NOT_FOUND) } ?: throw ResponseStatusException(HttpStatus.NOT_FOUND)
} }
@ -358,11 +353,7 @@ class BookController(
@ResponseStatus(HttpStatus.ACCEPTED) @ResponseStatus(HttpStatus.ACCEPTED)
fun refreshMetadata(@PathVariable bookId: Long) { fun refreshMetadata(@PathVariable bookId: Long) {
bookRepository.findByIdOrNull(bookId)?.let { book -> bookRepository.findByIdOrNull(bookId)?.let { book ->
try { taskReceiver.refreshBookMetadata(book)
asyncOrchestrator.refreshBooksMetadata(listOf(book))
} catch (e: RejectedExecutionException) {
throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Another metadata refresh task is already running")
}
} ?: throw ResponseStatusException(HttpStatus.NOT_FOUND) } ?: throw ResponseStatusException(HttpStatus.NOT_FOUND)
} }

View file

@ -1,8 +1,8 @@
package org.gotson.komga.interfaces.rest package org.gotson.komga.interfaces.rest
import mu.KotlinLogging import mu.KotlinLogging
import org.gotson.komga.application.service.AsyncOrchestrator
import org.gotson.komga.application.service.LibraryLifecycle import org.gotson.komga.application.service.LibraryLifecycle
import org.gotson.komga.application.tasks.TaskReceiver
import org.gotson.komga.domain.model.DirectoryNotFoundException import org.gotson.komga.domain.model.DirectoryNotFoundException
import org.gotson.komga.domain.model.DuplicateNameException import org.gotson.komga.domain.model.DuplicateNameException
import org.gotson.komga.domain.model.Library import org.gotson.komga.domain.model.Library
@ -25,7 +25,6 @@ import org.springframework.web.bind.annotation.ResponseStatus
import org.springframework.web.bind.annotation.RestController import org.springframework.web.bind.annotation.RestController
import org.springframework.web.server.ResponseStatusException import org.springframework.web.server.ResponseStatusException
import java.io.FileNotFoundException import java.io.FileNotFoundException
import java.util.concurrent.RejectedExecutionException
import javax.validation.Valid import javax.validation.Valid
import javax.validation.constraints.NotBlank import javax.validation.constraints.NotBlank
@ -37,7 +36,7 @@ class LibraryController(
private val libraryLifecycle: LibraryLifecycle, private val libraryLifecycle: LibraryLifecycle,
private val libraryRepository: LibraryRepository, private val libraryRepository: LibraryRepository,
private val bookRepository: BookRepository, private val bookRepository: BookRepository,
private val asyncOrchestrator: AsyncOrchestrator private val taskReceiver: TaskReceiver
) { ) {
@GetMapping @GetMapping
@ -93,11 +92,7 @@ class LibraryController(
@ResponseStatus(HttpStatus.ACCEPTED) @ResponseStatus(HttpStatus.ACCEPTED)
fun scan(@PathVariable libraryId: Long) { fun scan(@PathVariable libraryId: Long) {
libraryRepository.findByIdOrNull(libraryId)?.let { library -> libraryRepository.findByIdOrNull(libraryId)?.let { library ->
try { taskReceiver.scanLibrary(library)
asyncOrchestrator.scanAndAnalyzeOneLibrary(library)
} catch (e: RejectedExecutionException) {
throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Another scan task is already running")
}
} ?: throw ResponseStatusException(HttpStatus.NOT_FOUND) } ?: throw ResponseStatusException(HttpStatus.NOT_FOUND)
} }
@ -106,11 +101,7 @@ class LibraryController(
@ResponseStatus(HttpStatus.ACCEPTED) @ResponseStatus(HttpStatus.ACCEPTED)
fun analyze(@PathVariable libraryId: Long) { fun analyze(@PathVariable libraryId: Long) {
libraryRepository.findByIdOrNull(libraryId)?.let { library -> libraryRepository.findByIdOrNull(libraryId)?.let { library ->
try { bookRepository.findBySeriesLibrary(library).forEach { taskReceiver.analyzeBook(it) }
asyncOrchestrator.reAnalyzeBooks(bookRepository.findBySeriesLibraryIn(listOf(library)))
} catch (e: RejectedExecutionException) {
throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Another book analysis task is already running")
}
} ?: throw ResponseStatusException(HttpStatus.NOT_FOUND) } ?: throw ResponseStatusException(HttpStatus.NOT_FOUND)
} }
@ -119,11 +110,7 @@ class LibraryController(
@ResponseStatus(HttpStatus.ACCEPTED) @ResponseStatus(HttpStatus.ACCEPTED)
fun refreshMetadata(@PathVariable libraryId: Long) { fun refreshMetadata(@PathVariable libraryId: Long) {
libraryRepository.findByIdOrNull(libraryId)?.let { library -> libraryRepository.findByIdOrNull(libraryId)?.let { library ->
try { bookRepository.findBySeriesLibrary(library).forEach { taskReceiver.refreshBookMetadata(it) }
asyncOrchestrator.refreshBooksMetadata(bookRepository.findBySeriesLibraryIn(listOf(library)))
} catch (e: RejectedExecutionException) {
throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Another metadata refresh task is already running")
}
} ?: throw ResponseStatusException(HttpStatus.NOT_FOUND) } ?: throw ResponseStatusException(HttpStatus.NOT_FOUND)
} }
} }

View file

@ -9,7 +9,7 @@ import io.swagger.v3.oas.annotations.media.Content
import io.swagger.v3.oas.annotations.media.Schema import io.swagger.v3.oas.annotations.media.Schema
import io.swagger.v3.oas.annotations.responses.ApiResponse import io.swagger.v3.oas.annotations.responses.ApiResponse
import mu.KotlinLogging import mu.KotlinLogging
import org.gotson.komga.application.service.AsyncOrchestrator import org.gotson.komga.application.tasks.TaskReceiver
import org.gotson.komga.domain.model.Library import org.gotson.komga.domain.model.Library
import org.gotson.komga.domain.model.Media import org.gotson.komga.domain.model.Media
import org.gotson.komga.domain.model.Series import org.gotson.komga.domain.model.Series
@ -45,7 +45,6 @@ import org.springframework.web.bind.annotation.RequestParam
import org.springframework.web.bind.annotation.ResponseStatus import org.springframework.web.bind.annotation.ResponseStatus
import org.springframework.web.bind.annotation.RestController import org.springframework.web.bind.annotation.RestController
import org.springframework.web.server.ResponseStatusException import org.springframework.web.server.ResponseStatusException
import java.util.concurrent.RejectedExecutionException
import javax.validation.Valid import javax.validation.Valid
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
@ -56,7 +55,7 @@ class SeriesController(
private val seriesRepository: SeriesRepository, private val seriesRepository: SeriesRepository,
private val bookRepository: BookRepository, private val bookRepository: BookRepository,
private val bookController: BookController, private val bookController: BookController,
private val asyncOrchestrator: AsyncOrchestrator private val taskReceiver: TaskReceiver
) { ) {
@GetMapping @GetMapping
@ -222,11 +221,7 @@ class SeriesController(
@ResponseStatus(HttpStatus.ACCEPTED) @ResponseStatus(HttpStatus.ACCEPTED)
fun analyze(@PathVariable seriesId: Long) { fun analyze(@PathVariable seriesId: Long) {
seriesRepository.findByIdOrNull(seriesId)?.let { series -> seriesRepository.findByIdOrNull(seriesId)?.let { series ->
try { series.books.forEach { taskReceiver.analyzeBook(it) }
asyncOrchestrator.reAnalyzeBooks(series.books)
} catch (e: RejectedExecutionException) {
throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Another book analysis task is already running")
}
} ?: throw ResponseStatusException(HttpStatus.NOT_FOUND) } ?: throw ResponseStatusException(HttpStatus.NOT_FOUND)
} }
@ -235,11 +230,7 @@ class SeriesController(
@ResponseStatus(HttpStatus.ACCEPTED) @ResponseStatus(HttpStatus.ACCEPTED)
fun refreshMetadata(@PathVariable seriesId: Long) { fun refreshMetadata(@PathVariable seriesId: Long) {
seriesRepository.findByIdOrNull(seriesId)?.let { series -> seriesRepository.findByIdOrNull(seriesId)?.let { series ->
try { series.books.forEach { taskReceiver.refreshBookMetadata(it) }
asyncOrchestrator.refreshBooksMetadata(series.books)
} catch (e: RejectedExecutionException) {
throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Another metadata refresh task is already running")
}
} ?: throw ResponseStatusException(HttpStatus.NOT_FOUND) } ?: throw ResponseStatusException(HttpStatus.NOT_FOUND)
} }

View file

@ -1,29 +1,24 @@
package org.gotson.komga.interfaces.scheduler package org.gotson.komga.interfaces.scheduler
import mu.KotlinLogging import mu.KotlinLogging
import org.gotson.komga.application.service.AsyncOrchestrator import org.gotson.komga.application.tasks.TaskReceiver
import org.springframework.boot.context.event.ApplicationReadyEvent import org.springframework.boot.context.event.ApplicationReadyEvent
import org.springframework.context.annotation.Profile import org.springframework.context.annotation.Profile
import org.springframework.context.event.EventListener import org.springframework.context.event.EventListener
import org.springframework.scheduling.annotation.Scheduled import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Controller import org.springframework.stereotype.Controller
import java.util.concurrent.RejectedExecutionException
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
@Profile("!test") @Profile("!test")
@Controller @Controller
class PeriodicScannerController( class PeriodicScannerController(
private val asyncOrchestrator: AsyncOrchestrator private val taskReceiver: TaskReceiver
) { ) {
@EventListener(ApplicationReadyEvent::class) @EventListener(ApplicationReadyEvent::class)
@Scheduled(cron = "#{@komgaProperties.librariesScanCron ?: '-'}") @Scheduled(cron = "#{@komgaProperties.librariesScanCron ?: '-'}")
fun scanRootFolder() { fun scanAllLibraries() {
try { taskReceiver.scanLibraries()
asyncOrchestrator.scanAndAnalyzeAllLibraries()
} catch (e: RejectedExecutionException) {
logger.warn { "Another scan is already running, skipping" }
}
} }
} }

View file

@ -21,14 +21,15 @@ logging:
max-history: 1 max-history: 1
name: komga-dev.log name: komga-dev.log
level: level:
org.gotson.komga: DEBUG org.apache.activemq.audit.message: WARN
web: DEBUG # web: DEBUG
# org.hibernate: # org.gotson.komga: DEBUG
# stat: DEBUG # org.springframework.jms: DEBUG
# SQL: DEBUG # org.springframework.security.web.FilterChainProxy: DEBUG
# cache: DEBUG # org.hibernate.stat: DEBUG
# type.descriptor.sql.BasicBinder: TRACE # org.hibernate.SQL: DEBUG
# org.springframework.security.web.FilterChainProxy: DEBUG # org.hibernate.cache: DEBUG
# org.hibernate.type.descriptor.sql.BasicBinder: TRACE
management.metrics.export.influx: management.metrics.export.influx:
# enabled: true # enabled: true

View file

@ -4,6 +4,8 @@ logging:
file: file:
max-history: 10 max-history: 10
name: komga.log name: komga.log
level:
org.apache.activemq.audit.message: WARN
komga: komga:
libraries-scan-cron: "0 */15 * * * ?" libraries-scan-cron: "0 */15 * * * ?"

View file

@ -196,7 +196,7 @@ class LibraryScannerTest(
libraryScanner.scanRootFolder(library) libraryScanner.scanRootFolder(library)
every { mockAnalyzer.analyze(any()) } returns Media(status = Media.Status.READY, mediaType = "application/zip", pages = mutableListOf(makeBookPage("1.jpg"), makeBookPage("2.jpg"))) every { mockAnalyzer.analyze(any()) } returns Media(status = Media.Status.READY, mediaType = "application/zip", pages = mutableListOf(makeBookPage("1.jpg"), makeBookPage("2.jpg")))
bookRepository.findAll().map { bookLifecycle.analyzeAndPersist(it) }.map { it.get() } bookRepository.findAll().map { bookLifecycle.analyzeAndPersist(it) }
// when // when
libraryScanner.scanRootFolder(library) libraryScanner.scanRootFolder(library)

View file

@ -0,0 +1,95 @@
package org.gotson.komga.infrastructure.jms
import mu.KotlinLogging
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.jms.core.JmsTemplate
import org.springframework.jms.support.destination.JmsDestinationAccessor
import org.springframework.test.context.junit.jupiter.SpringExtension
import javax.jms.QueueBrowser
import javax.jms.Session
private val logger = KotlinLogging.logger {}
@ExtendWith(SpringExtension::class)
@SpringBootTest
class ArtemisConfigTest(
@Autowired private val jmsTemplate: JmsTemplate
) {
init {
jmsTemplate.receiveTimeout = JmsDestinationAccessor.RECEIVE_TIMEOUT_NO_WAIT
}
@BeforeEach
fun emptyQueue() {
while (jmsTemplate.receive(QUEUE_TASKS) != null) {
logger.info { "Emptying queue" }
}
}
@Test
fun `when sending messages with the same unique id then messages are deduplicated`() {
for (i in 1..5) {
jmsTemplate.convertAndSend(
QUEUE_TASKS,
"message $i"
) {
it.apply { setStringProperty(QUEUE_UNIQUE_ID, "1") }
}
}
val size = jmsTemplate.browse(QUEUE_TASKS) { _: Session, browser: QueueBrowser ->
browser.enumeration.toList().size
}
val msg = jmsTemplate.receiveAndConvert(QUEUE_TASKS) as String
assertThat(msg).isEqualTo("message 5")
assertThat(size).isEqualTo(1)
}
@Test
fun `when sending messages with some common unique id then messages are deduplicated`() {
for (i in 1..6) {
jmsTemplate.convertAndSend(
QUEUE_TASKS,
"message $i"
) {
it.apply { setStringProperty(QUEUE_UNIQUE_ID, i.rem(2).toString()) }
}
}
val size = jmsTemplate.browse(QUEUE_TASKS) { _: Session, browser: QueueBrowser ->
browser.enumeration.toList().size
}
val msg = jmsTemplate.receiveAndConvert(QUEUE_TASKS) as String
assertThat(msg).isEqualTo("message 5")
assertThat(size).isEqualTo(2)
}
@Test
fun `when sending messages without unique id then messages are not deduplicated`() {
for (i in 1..5) {
jmsTemplate.convertAndSend(
QUEUE_TASKS,
"message $i"
)
}
val size = jmsTemplate.browse(QUEUE_TASKS) { _: Session, browser: QueueBrowser ->
browser.enumeration.toList().size
}
val msg = jmsTemplate.receiveAndConvert(QUEUE_TASKS) as String
assertThat(msg).isEqualTo("message 1")
assertThat(size).isEqualTo(5)
}
}