feat(tasks): replace background tasks management

Use of Apache Artemis message queue instead async methods with executors.
Tasks are submitted to TaskReceiver, and handled by TaskHandler.
The Artemis queue is configured with last-value, which automatically remove duplicate tasks in the queue.
This commit is contained in:
Gauthier Roebroeck 2020-04-24 15:02:09 +08:00
parent e7f4e203fb
commit 60ce87a25d
21 changed files with 388 additions and 361 deletions

View file

@ -38,9 +38,12 @@ dependencies {
implementation("org.springframework.boot:spring-boot-starter-cache")
implementation("org.springframework.boot:spring-boot-starter-security")
implementation("org.springframework.boot:spring-boot-starter-thymeleaf")
implementation("org.springframework.boot:spring-boot-starter-artemis")
kapt("org.springframework.boot:spring-boot-configuration-processor")
implementation("org.apache.activemq:artemis-jms-server")
implementation("org.flywaydb:flyway-core")
implementation("org.hibernate:hibernate-jcache")
@ -93,6 +96,7 @@ dependencies {
testImplementation("com.tngtech.archunit:archunit-junit5:0.13.1")
developmentOnly("org.springframework.boot:spring-boot-devtools")
}
@ -100,7 +104,7 @@ tasks {
withType<KotlinCompile> {
kotlinOptions {
jvmTarget = "1.8"
freeCompilerArgs = listOf("-Xjsr305=strict")
freeCompilerArgs = listOf("-Xjsr305=strict", "-Xopt-in=kotlin.time.ExperimentalTime")
}
}

View file

@ -1,89 +0,0 @@
package org.gotson.komga.application.service
import mu.KotlinLogging
import org.apache.commons.lang3.time.DurationFormatUtils
import org.gotson.komga.domain.model.Book
import org.gotson.komga.domain.model.Library
import org.gotson.komga.domain.persistence.BookRepository
import org.gotson.komga.domain.persistence.LibraryRepository
import org.gotson.komga.domain.persistence.SeriesRepository
import org.gotson.komga.domain.service.LibraryScanner
import org.springframework.scheduling.annotation.Async
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import kotlin.system.measureTimeMillis
private val logger = KotlinLogging.logger {}
@Service
class AsyncOrchestrator(
private val libraryScanner: LibraryScanner,
private val libraryRepository: LibraryRepository,
private val bookRepository: BookRepository,
private val bookLifecycle: BookLifecycle,
private val seriesRepository: SeriesRepository,
private val metadataLifecycle: MetadataLifecycle
) {
@Async("periodicScanTaskExecutor")
fun scanAndAnalyzeAllLibraries() {
logger.info { "Starting periodic libraries scan" }
val libraries = libraryRepository.findAll()
if (libraries.isEmpty()) {
logger.info { "No libraries defined, nothing to scan" }
} else {
libraries.forEach {
libraryScanner.scanRootFolder(it)
}
logger.info { "Starting periodic book parsing" }
libraryScanner.analyzeUnknownBooks()
}
}
@Async("periodicScanTaskExecutor")
fun scanAndAnalyzeOneLibrary(library: Library) {
libraryScanner.scanRootFolder(library)
libraryScanner.analyzeUnknownBooks()
}
@Async("regenerateThumbnailsTaskExecutor")
@Transactional
fun generateThumbnails(books: List<Book>) {
val loadedBooks = bookRepository.findAllById(books.map { it.id })
var sumOfTasksTime = 0L
measureTimeMillis {
sumOfTasksTime = loadedBooks
.map { bookLifecycle.regenerateThumbnailAndPersist(it) }
.map {
try {
it.get()
} catch (ex: Exception) {
0L
}
}
.sum()
}.also {
logger.info { "Generated ${loadedBooks.size} thumbnails in ${DurationFormatUtils.formatDurationHMS(it)} (virtual: ${DurationFormatUtils.formatDurationHMS(sumOfTasksTime)})" }
}
}
@Async("reAnalyzeBooksTaskExecutor")
@Transactional
fun reAnalyzeBooks(books: List<Book>) {
val loadedBooks = bookRepository.findAllById(books.map { it.id })
loadedBooks.forEach { it.media.reset() }
bookRepository.saveAll(loadedBooks)
loadedBooks.map { bookLifecycle.analyzeAndPersist(it) }
}
@Async("reRefreshMetadataTaskExecutor")
@Transactional
fun refreshBooksMetadata(books: List<Book>) {
bookRepository
.findAllById(books.map { it.id })
.forEach { metadataLifecycle.refreshMetadata(it) }
}
}

View file

@ -1,7 +1,6 @@
package org.gotson.komga.application.service
import mu.KotlinLogging
import org.apache.commons.lang3.time.DurationFormatUtils
import org.gotson.komga.domain.model.Book
import org.gotson.komga.domain.model.BookPageContent
import org.gotson.komga.domain.model.ImageConversionException
@ -11,12 +10,7 @@ import org.gotson.komga.domain.persistence.BookRepository
import org.gotson.komga.domain.service.BookAnalyzer
import org.gotson.komga.infrastructure.image.ImageConverter
import org.gotson.komga.infrastructure.image.ImageType
import org.springframework.scheduling.annotation.Async
import org.springframework.scheduling.annotation.AsyncResult
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import java.util.concurrent.Future
import kotlin.system.measureTimeMillis
private val logger = KotlinLogging.logger {}
@ -27,34 +21,26 @@ class BookLifecycle(
private val imageConverter: ImageConverter
) {
@Transactional
@Async("analyzeBookTaskExecutor")
fun analyzeAndPersist(book: Book): Future<Long> {
fun analyzeAndPersist(book: Book) {
logger.info { "Analyze and persist book: $book" }
return AsyncResult(measureTimeMillis {
try {
book.media = bookAnalyzer.analyze(book)
} catch (ex: Exception) {
logger.error(ex) { "Error while analyzing book: $book" }
book.media = Media(status = Media.Status.ERROR, comment = ex.message)
}
bookRepository.save(book)
}.also { logger.info { "Parsing finished in ${DurationFormatUtils.formatDurationHMS(it)}" } })
try {
book.media = bookAnalyzer.analyze(book)
} catch (ex: Exception) {
logger.error(ex) { "Error while analyzing book: $book" }
book.media = Media(status = Media.Status.ERROR, comment = ex.message)
}
bookRepository.save(book)
}
@Transactional
@Async("analyzeBookTaskExecutor")
fun regenerateThumbnailAndPersist(book: Book): Future<Long> {
fun regenerateThumbnailAndPersist(book: Book) {
logger.info { "Regenerate thumbnail and persist book: $book" }
return AsyncResult(measureTimeMillis {
try {
book.media = bookAnalyzer.regenerateThumbnail(book)
} catch (ex: Exception) {
logger.error(ex) { "Error while recreating thumbnail" }
book.media = Media(status = Media.Status.ERROR)
}
bookRepository.save(book)
}.also { logger.info { "Thumbnail generated in ${DurationFormatUtils.formatDurationHMS(it)}" } })
try {
book.media = bookAnalyzer.regenerateThumbnail(book)
} catch (ex: Exception) {
logger.error(ex) { "Error while recreating thumbnail" }
book.media = Media(status = Media.Status.ERROR)
}
bookRepository.save(book)
}
@Throws(

View file

@ -1,6 +1,7 @@
package org.gotson.komga.application.service
import mu.KotlinLogging
import org.gotson.komga.application.tasks.TaskReceiver
import org.gotson.komga.domain.model.DirectoryNotFoundException
import org.gotson.komga.domain.model.DuplicateNameException
import org.gotson.komga.domain.model.Library
@ -12,7 +13,6 @@ import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import java.io.FileNotFoundException
import java.nio.file.Files
import java.util.concurrent.RejectedExecutionException
private val logger = KotlinLogging.logger {}
@ -21,7 +21,7 @@ class LibraryLifecycle(
private val libraryRepository: LibraryRepository,
private val seriesRepository: SeriesRepository,
private val userRepository: KomgaUserRepository,
private val asyncOrchestrator: AsyncOrchestrator
private val taskReceiver: TaskReceiver
) {
@Throws(
@ -49,15 +49,8 @@ class LibraryLifecycle(
throw PathContainedInPath("Library path ${library.path()} is a parent of existing library ${it.name}: ${it.path()}")
}
libraryRepository.save(library)
logger.info { "Trying to launch a scan for the newly added library: ${library.name}" }
try {
asyncOrchestrator.scanAndAnalyzeAllLibraries()
} catch (e: RejectedExecutionException) {
logger.warn { "Another scan is already running, skipping" }
}
taskReceiver.scanLibrary(library)
return library
}

View file

@ -6,41 +6,28 @@ import org.gotson.komga.domain.persistence.BookRepository
import org.gotson.komga.domain.persistence.SeriesRepository
import org.gotson.komga.domain.service.MetadataApplier
import org.gotson.komga.infrastructure.metadata.BookMetadataProvider
import org.gotson.komga.infrastructure.metadata.comicinfo.ComicInfoProvider
import org.springframework.data.repository.findByIdOrNull
import org.springframework.scheduling.annotation.Async
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
private val logger = KotlinLogging.logger {}
@Service
class MetadataLifecycle(
private val comicInfoProvider: ComicInfoProvider,
private val bookMetadataProviders: List<BookMetadataProvider>,
private val metadataApplier: MetadataApplier,
private val bookRepository: BookRepository,
private val seriesRepository: SeriesRepository
) {
@Transactional
@Async("refreshMetadataTaskExecutor")
fun refreshMetadata(book: Book) {
logger.info { "Refresh metadata for book: $book" }
val loadedBook = bookRepository.findByIdOrNull(book.id)
bookMetadataProviders.forEach {
it.getBookMetadataFromBook(book)?.let { bPatch ->
metadataApplier.apply(bPatch, book)
bookRepository.save(book)
loadedBook?.let { bookToPatch ->
bookMetadataProviders.forEach {
val patch = it.getBookMetadataFromBook(bookToPatch)
patch?.let { bPatch ->
metadataApplier.apply(bPatch, bookToPatch)
bookRepository.save(bookToPatch)
bPatch.series?.let { sPatch ->
metadataApplier.apply(sPatch, bookToPatch.series)
seriesRepository.save(bookToPatch.series)
}
bPatch.series?.let { sPatch ->
metadataApplier.apply(sPatch, book.series)
seriesRepository.save(book.series)
}
}
}

View file

@ -0,0 +1,23 @@
package org.gotson.komga.application.tasks
import java.io.Serializable
sealed class Task : Serializable {
abstract fun uniqueId(): String
data class ScanLibrary(val libraryId: Long) : Task() {
override fun uniqueId() = "SCAN_LIBRARY_$libraryId"
}
data class AnalyzeBook(val bookId: Long) : Task() {
override fun uniqueId() = "ANALYZE_BOOK_$bookId"
}
data class GenerateBookThumbnail(val bookId: Long) : Task() {
override fun uniqueId() = "GENERATE_BOOK_THUMBNAIL_$bookId"
}
data class RefreshBookMetadata(val bookId: Long) : Task() {
override fun uniqueId() = "REFRESH_BOOK_METADATA_$bookId"
}
}

View file

@ -0,0 +1,67 @@
package org.gotson.komga.application.tasks
import mu.KotlinLogging
import org.gotson.komga.application.service.BookLifecycle
import org.gotson.komga.application.service.MetadataLifecycle
import org.gotson.komga.domain.persistence.BookRepository
import org.gotson.komga.domain.persistence.LibraryRepository
import org.gotson.komga.domain.service.LibraryScanner
import org.gotson.komga.infrastructure.jms.QUEUE_TASKS
import org.gotson.komga.infrastructure.jms.QUEUE_TASKS_SELECTOR
import org.springframework.data.repository.findByIdOrNull
import org.springframework.jms.annotation.JmsListener
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import kotlin.time.measureTime
private val logger = KotlinLogging.logger {}
@Service
class TaskHandler(
private val taskReceiver: TaskReceiver,
private val libraryRepository: LibraryRepository,
private val bookRepository: BookRepository,
private val libraryScanner: LibraryScanner,
private val bookLifecycle: BookLifecycle,
private val metadataLifecycle: MetadataLifecycle
) {
@JmsListener(destination = QUEUE_TASKS, selector = QUEUE_TASKS_SELECTOR)
@Transactional
fun handleTask(task: Task) {
logger.info { "Executing task: $task" }
try {
measureTime {
when (task) {
is Task.ScanLibrary ->
libraryRepository.findByIdOrNull(task.libraryId)?.let {
libraryScanner.scanRootFolder(it)
taskReceiver.analyzeUnknownBooks(it)
} ?: logger.warn { "Cannot execute task $task: Library does not exist" }
is Task.AnalyzeBook ->
bookRepository.findByIdOrNull(task.bookId)?.let {
bookLifecycle.analyzeAndPersist(it)
taskReceiver.refreshBookMetadata(it)
} ?: logger.warn { "Cannot execute task $task: Book does not exist" }
is Task.GenerateBookThumbnail ->
bookRepository.findByIdOrNull(task.bookId)?.let {
bookLifecycle.regenerateThumbnailAndPersist(it)
} ?: logger.warn { "Cannot execute task $task: Book does not exist" }
is Task.RefreshBookMetadata ->
bookRepository.findByIdOrNull(task.bookId)?.let {
metadataLifecycle.refreshMetadata(it)
} ?: logger.warn { "Cannot execute task $task: Book does not exist" }
}
}.also {
logger.info { "Task $task executed in $it" }
}
} catch (e: Exception) {
logger.error(e) { "Task $task execution failed" }
}
}
}

View file

@ -0,0 +1,60 @@
package org.gotson.komga.application.tasks
import mu.KotlinLogging
import org.gotson.komga.domain.model.Book
import org.gotson.komga.domain.model.Library
import org.gotson.komga.domain.model.Media
import org.gotson.komga.domain.persistence.BookRepository
import org.gotson.komga.domain.persistence.LibraryRepository
import org.gotson.komga.infrastructure.jms.QUEUE_TASKS
import org.gotson.komga.infrastructure.jms.QUEUE_TASKS_TYPE
import org.gotson.komga.infrastructure.jms.QUEUE_TYPE
import org.gotson.komga.infrastructure.jms.QUEUE_UNIQUE_ID
import org.springframework.jms.core.JmsTemplate
import org.springframework.stereotype.Service
private val logger = KotlinLogging.logger {}
@Service
class TaskReceiver(
private val jmsTemplate: JmsTemplate,
private val libraryRepository: LibraryRepository,
private val bookRepository: BookRepository
) {
fun scanLibraries() {
libraryRepository.findAll().forEach { scanLibrary(it) }
}
fun scanLibrary(library: Library) {
submitTask(Task.ScanLibrary(library.id))
}
fun analyzeUnknownBooks(library: Library) {
bookRepository.findAllByMediaStatusAndSeriesLibrary(Media.Status.UNKNOWN, library).forEach {
submitTask(Task.AnalyzeBook(it.id))
}
}
fun analyzeBook(book: Book) {
submitTask(Task.AnalyzeBook(book.id))
}
fun generateBookThumbnail(book: Book) {
submitTask(Task.GenerateBookThumbnail(book.id))
}
fun refreshBookMetadata(book: Book) {
submitTask(Task.RefreshBookMetadata(book.id))
}
private fun submitTask(task: Task) {
logger.info { "Sending task: $task" }
jmsTemplate.convertAndSend(QUEUE_TASKS, task) {
it.apply {
setStringProperty(QUEUE_TYPE, QUEUE_TASKS_TYPE)
setStringProperty(QUEUE_UNIQUE_ID, task.uniqueId())
}
}
}
}

View file

@ -28,8 +28,9 @@ interface BookRepository : JpaRepository<Book, Long>, JpaSpecificationExecutor<B
fun findBySeriesLibraryIn(seriesLibrary: Collection<Library>, pageable: Pageable): Page<Book>
fun findBySeriesLibraryIn(seriesLibrary: Collection<Library>): List<Book>
fun findBySeriesLibrary(seriesLibrary: Library): List<Book>
fun findByUrl(url: URL): Book?
fun findAllByMediaStatus(status: Media.Status): List<Book>
fun findAllByMediaStatusAndSeriesLibrary(status: Media.Status, library: Library): List<Book>
fun findAllByMediaThumbnailIsNull(): List<Book>
}

View file

@ -1,18 +1,13 @@
package org.gotson.komga.domain.service
import mu.KotlinLogging
import org.apache.commons.lang3.time.DurationFormatUtils
import org.gotson.komga.application.service.BookLifecycle
import org.gotson.komga.application.service.MetadataLifecycle
import org.gotson.komga.domain.model.Library
import org.gotson.komga.domain.model.Media
import org.gotson.komga.domain.persistence.BookRepository
import org.gotson.komga.domain.persistence.SeriesRepository
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import java.nio.file.Paths
import java.time.temporal.ChronoUnit
import kotlin.system.measureTimeMillis
private val logger = KotlinLogging.logger {}
@ -20,86 +15,57 @@ private val logger = KotlinLogging.logger {}
class LibraryScanner(
private val fileSystemScanner: FileSystemScanner,
private val seriesRepository: SeriesRepository,
private val bookRepository: BookRepository,
private val bookLifecycle: BookLifecycle,
private val metadataLifecycle: MetadataLifecycle
private val bookRepository: BookRepository
) {
@Transactional
fun scanRootFolder(library: Library) {
logger.info { "Updating library: ${library.name}, root folder: ${library.root}" }
measureTimeMillis {
val scannedSeries = fileSystemScanner.scanRootFolder(Paths.get(library.root.toURI()))
val scannedSeries = fileSystemScanner.scanRootFolder(Paths.get(library.root.toURI()))
// delete series that don't exist anymore
if (scannedSeries.isEmpty()) {
logger.info { "Scan returned no series, deleting all existing series" }
seriesRepository.deleteByLibraryId(library.id)
// delete series that don't exist anymore
if (scannedSeries.isEmpty()) {
logger.info { "Scan returned no series, deleting all existing series" }
seriesRepository.deleteByLibraryId(library.id)
} else {
scannedSeries.map { it.url }.let { urls ->
seriesRepository.findByLibraryIdAndUrlNotIn(library.id, urls).forEach {
logger.info { "Deleting series not on disk anymore: $it" }
seriesRepository.delete(it)
}
}
}
scannedSeries.forEach { newSeries ->
val existingSeries = seriesRepository.findByLibraryIdAndUrl(library.id, newSeries.url)
// if series does not exist, save it
if (existingSeries == null) {
logger.info { "Adding new series: $newSeries" }
seriesRepository.save(newSeries.also { it.library = library })
} else {
scannedSeries.map { it.url }.let { urls ->
seriesRepository.findByLibraryIdAndUrlNotIn(library.id, urls).forEach {
logger.info { "Deleting series not on disk anymore: $it" }
seriesRepository.delete(it)
}
// if series already exists, update it
if (newSeries.fileLastModified.truncatedTo(ChronoUnit.MILLIS) != existingSeries.fileLastModified.truncatedTo(ChronoUnit.MILLIS)) {
logger.info { "Series changed on disk, updating: $newSeries" }
existingSeries.fileLastModified = newSeries.fileLastModified
// update list of books with existing entities if they exist
existingSeries.books = newSeries.books.map { newBook ->
val existingBook = bookRepository.findByUrl(newBook.url) ?: newBook
if (newBook.fileLastModified.truncatedTo(ChronoUnit.MILLIS) != existingBook.fileLastModified.truncatedTo(ChronoUnit.MILLIS)) {
logger.info { "Book changed on disk, update and reset media status: $newBook" }
existingBook.fileLastModified = newBook.fileLastModified
existingBook.fileSize = newBook.fileSize
existingBook.media.reset()
}
existingBook
}.toMutableList()
seriesRepository.save(existingSeries)
}
}
scannedSeries.forEach { newSeries ->
val existingSeries = seriesRepository.findByLibraryIdAndUrl(library.id, newSeries.url)
// if series does not exist, save it
if (existingSeries == null) {
logger.info { "Adding new series: $newSeries" }
seriesRepository.save(newSeries.also { it.library = library })
} else {
// if series already exists, update it
if (newSeries.fileLastModified.truncatedTo(ChronoUnit.MILLIS) != existingSeries.fileLastModified.truncatedTo(ChronoUnit.MILLIS)) {
logger.info { "Series changed on disk, updating: $newSeries" }
existingSeries.fileLastModified = newSeries.fileLastModified
// update list of books with existing entities if they exist
existingSeries.books = newSeries.books.map { newBook ->
val existingBook = bookRepository.findByUrl(newBook.url) ?: newBook
if (newBook.fileLastModified.truncatedTo(ChronoUnit.MILLIS) != existingBook.fileLastModified.truncatedTo(ChronoUnit.MILLIS)) {
logger.info { "Book changed on disk, update and reset media status: $newBook" }
existingBook.fileLastModified = newBook.fileLastModified
existingBook.fileSize = newBook.fileSize
existingBook.media.reset()
}
existingBook
}.toMutableList()
seriesRepository.save(existingSeries)
}
}
}
}.also { logger.info { "Library update finished in ${DurationFormatUtils.formatDurationHMS(it)}" } }
}
fun analyzeUnknownBooks() {
logger.info { "Analyze all books in status: unknown" }
val booksToAnalyze = bookRepository.findAllByMediaStatus(Media.Status.UNKNOWN)
var sumOfTasksTime = 0L
measureTimeMillis {
sumOfTasksTime = booksToAnalyze
.map { bookLifecycle.analyzeAndPersist(it) }
.map {
try {
it.get()
} catch (ex: Exception) {
0L
}
}
.sum()
}.also {
logger.info { "Analyzed ${booksToAnalyze.size} books in ${DurationFormatUtils.formatDurationHMS(it)} (virtual: ${DurationFormatUtils.formatDurationHMS(sumOfTasksTime)})" }
}
logger.info { "Refresh metadata for all books analyzed" }
booksToAnalyze.forEach {
metadataLifecycle.refreshMetadata(it)
}
}
}

View file

@ -1,55 +0,0 @@
package org.gotson.komga.infrastructure.async
import org.gotson.komga.infrastructure.configuration.KomgaProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.scheduling.annotation.EnableAsync
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import java.util.concurrent.Executor
@Configuration
@EnableAsync
class AsyncConfiguration(
private val komgaProperties: KomgaProperties
) {
@Bean("analyzeBookTaskExecutor")
fun analyzeBookTaskExecutor(): Executor =
ThreadPoolTaskExecutor().apply {
corePoolSize = komgaProperties.threads.analyzer
}
@Bean("periodicScanTaskExecutor")
fun periodicScanTaskExecutor(): Executor =
ThreadPoolTaskExecutor().apply {
corePoolSize = 1
maxPoolSize = 1
setQueueCapacity(0)
}
@Bean("regenerateThumbnailsTaskExecutor")
fun regenerateThumbnailsTaskExecutor(): Executor =
ThreadPoolTaskExecutor().apply {
corePoolSize = 1
maxPoolSize = 1
setQueueCapacity(0)
}
@Bean("reAnalyzeBooksTaskExecutor")
fun reAnalyzeBooksTaskExecutor(): Executor =
ThreadPoolTaskExecutor().apply {
corePoolSize = 1
}
@Bean("refreshMetadataTaskExecutor")
fun refreshMetadataTaskExecutor(): Executor =
ThreadPoolTaskExecutor().apply {
corePoolSize = 1
}
@Bean("reRefreshMetadataTaskExecutor")
fun reRefreshMetadataTaskExecutor(): Executor =
ThreadPoolTaskExecutor().apply {
corePoolSize = 1
}
}

View file

@ -0,0 +1,32 @@
package org.gotson.komga.infrastructure.jms
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration
import org.apache.activemq.artemis.core.settings.impl.AddressSettings
import org.springframework.boot.autoconfigure.jms.artemis.ArtemisConfigurationCustomizer
import org.springframework.context.annotation.Configuration
const val QUEUE_UNIQUE_ID = "unique_id"
const val QUEUE_TYPE = "type"
const val QUEUE_TASKS = "tasks.background"
const val QUEUE_TASKS_TYPE = "task"
const val QUEUE_TASKS_SELECTOR = "$QUEUE_TYPE = '$QUEUE_TASKS_TYPE'"
@Configuration
class ArtemisConfig : ArtemisConfigurationCustomizer {
override fun customize(configuration: org.apache.activemq.artemis.core.config.Configuration?) {
configuration?.let {
// disable prefetch, ensures messages stay in the queue and last value can have desired effect
it.addAddressesSetting(QUEUE_TASKS, AddressSettings().apply {
defaultConsumerWindowSize = 0
})
it.addQueueConfiguration(
CoreQueueConfiguration()
.setAddress(QUEUE_TASKS)
.setName(QUEUE_TASKS)
.setLastValueKey(QUEUE_UNIQUE_ID)
.setRoutingType(RoutingType.ANYCAST)
)
}
}
}

View file

@ -1,7 +1,7 @@
package org.gotson.komga.interfaces.rest
import mu.KotlinLogging
import org.gotson.komga.application.service.AsyncOrchestrator
import org.gotson.komga.application.tasks.TaskReceiver
import org.gotson.komga.domain.persistence.BookRepository
import org.springframework.http.HttpStatus
import org.springframework.security.access.prepost.PreAuthorize
@ -9,8 +9,6 @@ import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.ResponseStatus
import org.springframework.web.bind.annotation.RestController
import org.springframework.web.server.ResponseStatusException
import java.util.concurrent.RejectedExecutionException
private val logger = KotlinLogging.logger {}
@ -18,29 +16,21 @@ private val logger = KotlinLogging.logger {}
@RequestMapping("api/v1/admin")
@PreAuthorize("hasRole('ADMIN')")
class AdminController(
private val asyncOrchestrator: AsyncOrchestrator,
private val bookRepository: BookRepository
private val bookRepository: BookRepository,
private val taskReceiver: TaskReceiver
) {
@PostMapping("rpc/thumbnails/regenerate/all")
@ResponseStatus(HttpStatus.ACCEPTED)
fun regenerateAllThumbnails() {
try {
logger.info { "Regenerate thumbnail for all books" }
asyncOrchestrator.generateThumbnails(bookRepository.findAll())
} catch (e: RejectedExecutionException) {
throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Thumbnail regeneration task is already running")
}
logger.info { "Regenerate thumbnail for all books" }
bookRepository.findAll().forEach { taskReceiver.generateBookThumbnail(it) }
}
@PostMapping("rpc/thumbnails/regenerate/missing")
@ResponseStatus(HttpStatus.ACCEPTED)
fun regenerateMissingThumbnails() {
try {
logger.info { "Regenerate missing thumbnails" }
asyncOrchestrator.generateThumbnails(bookRepository.findAllByMediaThumbnailIsNull())
} catch (e: RejectedExecutionException) {
throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Thumbnail regeneration task is already running")
}
logger.info { "Regenerate missing thumbnails" }
bookRepository.findAllByMediaThumbnailIsNull().forEach { taskReceiver.generateBookThumbnail(it) }
}
}

View file

@ -9,8 +9,8 @@ import io.swagger.v3.oas.annotations.media.Content
import io.swagger.v3.oas.annotations.media.Schema
import io.swagger.v3.oas.annotations.responses.ApiResponse
import mu.KotlinLogging
import org.gotson.komga.application.service.AsyncOrchestrator
import org.gotson.komga.application.service.BookLifecycle
import org.gotson.komga.application.tasks.TaskReceiver
import org.gotson.komga.domain.model.Author
import org.gotson.komga.domain.model.Book
import org.gotson.komga.domain.model.BookMetadata
@ -57,7 +57,6 @@ import org.springframework.web.server.ResponseStatusException
import java.io.FileNotFoundException
import java.nio.file.NoSuchFileException
import java.time.ZoneOffset
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.TimeUnit
import javax.persistence.criteria.JoinType
import javax.validation.Valid
@ -69,7 +68,7 @@ private val logger = KotlinLogging.logger {}
class BookController(
private val bookRepository: BookRepository,
private val bookLifecycle: BookLifecycle,
private val asyncOrchestrator: AsyncOrchestrator
private val taskReceiver: TaskReceiver
) {
@PageableAsQueryParam
@ -345,11 +344,7 @@ class BookController(
@ResponseStatus(HttpStatus.ACCEPTED)
fun analyze(@PathVariable bookId: Long) {
bookRepository.findByIdOrNull(bookId)?.let { book ->
try {
asyncOrchestrator.reAnalyzeBooks(listOf(book))
} catch (e: RejectedExecutionException) {
throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Another book analysis task is already running")
}
taskReceiver.analyzeBook(book)
} ?: throw ResponseStatusException(HttpStatus.NOT_FOUND)
}
@ -358,11 +353,7 @@ class BookController(
@ResponseStatus(HttpStatus.ACCEPTED)
fun refreshMetadata(@PathVariable bookId: Long) {
bookRepository.findByIdOrNull(bookId)?.let { book ->
try {
asyncOrchestrator.refreshBooksMetadata(listOf(book))
} catch (e: RejectedExecutionException) {
throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Another metadata refresh task is already running")
}
taskReceiver.refreshBookMetadata(book)
} ?: throw ResponseStatusException(HttpStatus.NOT_FOUND)
}

View file

@ -1,8 +1,8 @@
package org.gotson.komga.interfaces.rest
import mu.KotlinLogging
import org.gotson.komga.application.service.AsyncOrchestrator
import org.gotson.komga.application.service.LibraryLifecycle
import org.gotson.komga.application.tasks.TaskReceiver
import org.gotson.komga.domain.model.DirectoryNotFoundException
import org.gotson.komga.domain.model.DuplicateNameException
import org.gotson.komga.domain.model.Library
@ -25,7 +25,6 @@ import org.springframework.web.bind.annotation.ResponseStatus
import org.springframework.web.bind.annotation.RestController
import org.springframework.web.server.ResponseStatusException
import java.io.FileNotFoundException
import java.util.concurrent.RejectedExecutionException
import javax.validation.Valid
import javax.validation.constraints.NotBlank
@ -37,7 +36,7 @@ class LibraryController(
private val libraryLifecycle: LibraryLifecycle,
private val libraryRepository: LibraryRepository,
private val bookRepository: BookRepository,
private val asyncOrchestrator: AsyncOrchestrator
private val taskReceiver: TaskReceiver
) {
@GetMapping
@ -93,11 +92,7 @@ class LibraryController(
@ResponseStatus(HttpStatus.ACCEPTED)
fun scan(@PathVariable libraryId: Long) {
libraryRepository.findByIdOrNull(libraryId)?.let { library ->
try {
asyncOrchestrator.scanAndAnalyzeOneLibrary(library)
} catch (e: RejectedExecutionException) {
throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Another scan task is already running")
}
taskReceiver.scanLibrary(library)
} ?: throw ResponseStatusException(HttpStatus.NOT_FOUND)
}
@ -106,11 +101,7 @@ class LibraryController(
@ResponseStatus(HttpStatus.ACCEPTED)
fun analyze(@PathVariable libraryId: Long) {
libraryRepository.findByIdOrNull(libraryId)?.let { library ->
try {
asyncOrchestrator.reAnalyzeBooks(bookRepository.findBySeriesLibraryIn(listOf(library)))
} catch (e: RejectedExecutionException) {
throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Another book analysis task is already running")
}
bookRepository.findBySeriesLibrary(library).forEach { taskReceiver.analyzeBook(it) }
} ?: throw ResponseStatusException(HttpStatus.NOT_FOUND)
}
@ -119,11 +110,7 @@ class LibraryController(
@ResponseStatus(HttpStatus.ACCEPTED)
fun refreshMetadata(@PathVariable libraryId: Long) {
libraryRepository.findByIdOrNull(libraryId)?.let { library ->
try {
asyncOrchestrator.refreshBooksMetadata(bookRepository.findBySeriesLibraryIn(listOf(library)))
} catch (e: RejectedExecutionException) {
throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Another metadata refresh task is already running")
}
bookRepository.findBySeriesLibrary(library).forEach { taskReceiver.refreshBookMetadata(it) }
} ?: throw ResponseStatusException(HttpStatus.NOT_FOUND)
}
}

View file

@ -9,7 +9,7 @@ import io.swagger.v3.oas.annotations.media.Content
import io.swagger.v3.oas.annotations.media.Schema
import io.swagger.v3.oas.annotations.responses.ApiResponse
import mu.KotlinLogging
import org.gotson.komga.application.service.AsyncOrchestrator
import org.gotson.komga.application.tasks.TaskReceiver
import org.gotson.komga.domain.model.Library
import org.gotson.komga.domain.model.Media
import org.gotson.komga.domain.model.Series
@ -45,7 +45,6 @@ import org.springframework.web.bind.annotation.RequestParam
import org.springframework.web.bind.annotation.ResponseStatus
import org.springframework.web.bind.annotation.RestController
import org.springframework.web.server.ResponseStatusException
import java.util.concurrent.RejectedExecutionException
import javax.validation.Valid
private val logger = KotlinLogging.logger {}
@ -56,7 +55,7 @@ class SeriesController(
private val seriesRepository: SeriesRepository,
private val bookRepository: BookRepository,
private val bookController: BookController,
private val asyncOrchestrator: AsyncOrchestrator
private val taskReceiver: TaskReceiver
) {
@GetMapping
@ -222,11 +221,7 @@ class SeriesController(
@ResponseStatus(HttpStatus.ACCEPTED)
fun analyze(@PathVariable seriesId: Long) {
seriesRepository.findByIdOrNull(seriesId)?.let { series ->
try {
asyncOrchestrator.reAnalyzeBooks(series.books)
} catch (e: RejectedExecutionException) {
throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Another book analysis task is already running")
}
series.books.forEach { taskReceiver.analyzeBook(it) }
} ?: throw ResponseStatusException(HttpStatus.NOT_FOUND)
}
@ -235,11 +230,7 @@ class SeriesController(
@ResponseStatus(HttpStatus.ACCEPTED)
fun refreshMetadata(@PathVariable seriesId: Long) {
seriesRepository.findByIdOrNull(seriesId)?.let { series ->
try {
asyncOrchestrator.refreshBooksMetadata(series.books)
} catch (e: RejectedExecutionException) {
throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Another metadata refresh task is already running")
}
series.books.forEach { taskReceiver.refreshBookMetadata(it) }
} ?: throw ResponseStatusException(HttpStatus.NOT_FOUND)
}

View file

@ -1,29 +1,24 @@
package org.gotson.komga.interfaces.scheduler
import mu.KotlinLogging
import org.gotson.komga.application.service.AsyncOrchestrator
import org.gotson.komga.application.tasks.TaskReceiver
import org.springframework.boot.context.event.ApplicationReadyEvent
import org.springframework.context.annotation.Profile
import org.springframework.context.event.EventListener
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Controller
import java.util.concurrent.RejectedExecutionException
private val logger = KotlinLogging.logger {}
@Profile("!test")
@Controller
class PeriodicScannerController(
private val asyncOrchestrator: AsyncOrchestrator
private val taskReceiver: TaskReceiver
) {
@EventListener(ApplicationReadyEvent::class)
@Scheduled(cron = "#{@komgaProperties.librariesScanCron ?: '-'}")
fun scanRootFolder() {
try {
asyncOrchestrator.scanAndAnalyzeAllLibraries()
} catch (e: RejectedExecutionException) {
logger.warn { "Another scan is already running, skipping" }
}
fun scanAllLibraries() {
taskReceiver.scanLibraries()
}
}

View file

@ -21,14 +21,15 @@ logging:
max-history: 1
name: komga-dev.log
level:
org.gotson.komga: DEBUG
web: DEBUG
# org.hibernate:
# stat: DEBUG
# SQL: DEBUG
# cache: DEBUG
# type.descriptor.sql.BasicBinder: TRACE
# org.springframework.security.web.FilterChainProxy: DEBUG
org.apache.activemq.audit.message: WARN
# web: DEBUG
# org.gotson.komga: DEBUG
# org.springframework.jms: DEBUG
# org.springframework.security.web.FilterChainProxy: DEBUG
# org.hibernate.stat: DEBUG
# org.hibernate.SQL: DEBUG
# org.hibernate.cache: DEBUG
# org.hibernate.type.descriptor.sql.BasicBinder: TRACE
management.metrics.export.influx:
# enabled: true

View file

@ -4,6 +4,8 @@ logging:
file:
max-history: 10
name: komga.log
level:
org.apache.activemq.audit.message: WARN
komga:
libraries-scan-cron: "0 */15 * * * ?"

View file

@ -196,7 +196,7 @@ class LibraryScannerTest(
libraryScanner.scanRootFolder(library)
every { mockAnalyzer.analyze(any()) } returns Media(status = Media.Status.READY, mediaType = "application/zip", pages = mutableListOf(makeBookPage("1.jpg"), makeBookPage("2.jpg")))
bookRepository.findAll().map { bookLifecycle.analyzeAndPersist(it) }.map { it.get() }
bookRepository.findAll().map { bookLifecycle.analyzeAndPersist(it) }
// when
libraryScanner.scanRootFolder(library)

View file

@ -0,0 +1,95 @@
package org.gotson.komga.infrastructure.jms
import mu.KotlinLogging
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.jms.core.JmsTemplate
import org.springframework.jms.support.destination.JmsDestinationAccessor
import org.springframework.test.context.junit.jupiter.SpringExtension
import javax.jms.QueueBrowser
import javax.jms.Session
private val logger = KotlinLogging.logger {}
@ExtendWith(SpringExtension::class)
@SpringBootTest
class ArtemisConfigTest(
@Autowired private val jmsTemplate: JmsTemplate
) {
init {
jmsTemplate.receiveTimeout = JmsDestinationAccessor.RECEIVE_TIMEOUT_NO_WAIT
}
@BeforeEach
fun emptyQueue() {
while (jmsTemplate.receive(QUEUE_TASKS) != null) {
logger.info { "Emptying queue" }
}
}
@Test
fun `when sending messages with the same unique id then messages are deduplicated`() {
for (i in 1..5) {
jmsTemplate.convertAndSend(
QUEUE_TASKS,
"message $i"
) {
it.apply { setStringProperty(QUEUE_UNIQUE_ID, "1") }
}
}
val size = jmsTemplate.browse(QUEUE_TASKS) { _: Session, browser: QueueBrowser ->
browser.enumeration.toList().size
}
val msg = jmsTemplate.receiveAndConvert(QUEUE_TASKS) as String
assertThat(msg).isEqualTo("message 5")
assertThat(size).isEqualTo(1)
}
@Test
fun `when sending messages with some common unique id then messages are deduplicated`() {
for (i in 1..6) {
jmsTemplate.convertAndSend(
QUEUE_TASKS,
"message $i"
) {
it.apply { setStringProperty(QUEUE_UNIQUE_ID, i.rem(2).toString()) }
}
}
val size = jmsTemplate.browse(QUEUE_TASKS) { _: Session, browser: QueueBrowser ->
browser.enumeration.toList().size
}
val msg = jmsTemplate.receiveAndConvert(QUEUE_TASKS) as String
assertThat(msg).isEqualTo("message 5")
assertThat(size).isEqualTo(2)
}
@Test
fun `when sending messages without unique id then messages are not deduplicated`() {
for (i in 1..5) {
jmsTemplate.convertAndSend(
QUEUE_TASKS,
"message $i"
)
}
val size = jmsTemplate.browse(QUEUE_TASKS) { _: Session, browser: QueueBrowser ->
browser.enumeration.toList().size
}
val msg = jmsTemplate.receiveAndConvert(QUEUE_TASKS) as String
assertThat(msg).isEqualTo("message 1")
assertThat(size).isEqualTo(5)
}
}