diff --git a/komga/build.gradle.kts b/komga/build.gradle.kts index 3f7ad0c49..84c7d63ff 100644 --- a/komga/build.gradle.kts +++ b/komga/build.gradle.kts @@ -38,9 +38,12 @@ dependencies { implementation("org.springframework.boot:spring-boot-starter-cache") implementation("org.springframework.boot:spring-boot-starter-security") implementation("org.springframework.boot:spring-boot-starter-thymeleaf") + implementation("org.springframework.boot:spring-boot-starter-artemis") kapt("org.springframework.boot:spring-boot-configuration-processor") + implementation("org.apache.activemq:artemis-jms-server") + implementation("org.flywaydb:flyway-core") implementation("org.hibernate:hibernate-jcache") @@ -93,6 +96,7 @@ dependencies { testImplementation("com.tngtech.archunit:archunit-junit5:0.13.1") + developmentOnly("org.springframework.boot:spring-boot-devtools") } @@ -100,7 +104,7 @@ tasks { withType { kotlinOptions { jvmTarget = "1.8" - freeCompilerArgs = listOf("-Xjsr305=strict") + freeCompilerArgs = listOf("-Xjsr305=strict", "-Xopt-in=kotlin.time.ExperimentalTime") } } diff --git a/komga/src/main/kotlin/org/gotson/komga/application/service/AsyncOrchestrator.kt b/komga/src/main/kotlin/org/gotson/komga/application/service/AsyncOrchestrator.kt deleted file mode 100644 index f6add9207..000000000 --- a/komga/src/main/kotlin/org/gotson/komga/application/service/AsyncOrchestrator.kt +++ /dev/null @@ -1,89 +0,0 @@ -package org.gotson.komga.application.service - -import mu.KotlinLogging -import org.apache.commons.lang3.time.DurationFormatUtils -import org.gotson.komga.domain.model.Book -import org.gotson.komga.domain.model.Library -import org.gotson.komga.domain.persistence.BookRepository -import org.gotson.komga.domain.persistence.LibraryRepository -import org.gotson.komga.domain.persistence.SeriesRepository -import org.gotson.komga.domain.service.LibraryScanner -import org.springframework.scheduling.annotation.Async -import org.springframework.stereotype.Service -import org.springframework.transaction.annotation.Transactional -import kotlin.system.measureTimeMillis - -private val logger = KotlinLogging.logger {} - -@Service -class AsyncOrchestrator( - private val libraryScanner: LibraryScanner, - private val libraryRepository: LibraryRepository, - private val bookRepository: BookRepository, - private val bookLifecycle: BookLifecycle, - private val seriesRepository: SeriesRepository, - private val metadataLifecycle: MetadataLifecycle -) { - - @Async("periodicScanTaskExecutor") - fun scanAndAnalyzeAllLibraries() { - logger.info { "Starting periodic libraries scan" } - val libraries = libraryRepository.findAll() - - if (libraries.isEmpty()) { - logger.info { "No libraries defined, nothing to scan" } - } else { - libraries.forEach { - libraryScanner.scanRootFolder(it) - } - - logger.info { "Starting periodic book parsing" } - libraryScanner.analyzeUnknownBooks() - } - } - - @Async("periodicScanTaskExecutor") - fun scanAndAnalyzeOneLibrary(library: Library) { - libraryScanner.scanRootFolder(library) - libraryScanner.analyzeUnknownBooks() - } - - @Async("regenerateThumbnailsTaskExecutor") - @Transactional - fun generateThumbnails(books: List) { - val loadedBooks = bookRepository.findAllById(books.map { it.id }) - var sumOfTasksTime = 0L - measureTimeMillis { - sumOfTasksTime = loadedBooks - .map { bookLifecycle.regenerateThumbnailAndPersist(it) } - .map { - try { - it.get() - } catch (ex: Exception) { - 0L - } - } - .sum() - }.also { - logger.info { "Generated ${loadedBooks.size} thumbnails in ${DurationFormatUtils.formatDurationHMS(it)} (virtual: ${DurationFormatUtils.formatDurationHMS(sumOfTasksTime)})" } - } - } - - @Async("reAnalyzeBooksTaskExecutor") - @Transactional - fun reAnalyzeBooks(books: List) { - val loadedBooks = bookRepository.findAllById(books.map { it.id }) - loadedBooks.forEach { it.media.reset() } - bookRepository.saveAll(loadedBooks) - - loadedBooks.map { bookLifecycle.analyzeAndPersist(it) } - } - - @Async("reRefreshMetadataTaskExecutor") - @Transactional - fun refreshBooksMetadata(books: List) { - bookRepository - .findAllById(books.map { it.id }) - .forEach { metadataLifecycle.refreshMetadata(it) } - } -} diff --git a/komga/src/main/kotlin/org/gotson/komga/application/service/BookLifecycle.kt b/komga/src/main/kotlin/org/gotson/komga/application/service/BookLifecycle.kt index ed1f4468c..9657006ec 100644 --- a/komga/src/main/kotlin/org/gotson/komga/application/service/BookLifecycle.kt +++ b/komga/src/main/kotlin/org/gotson/komga/application/service/BookLifecycle.kt @@ -1,7 +1,6 @@ package org.gotson.komga.application.service import mu.KotlinLogging -import org.apache.commons.lang3.time.DurationFormatUtils import org.gotson.komga.domain.model.Book import org.gotson.komga.domain.model.BookPageContent import org.gotson.komga.domain.model.ImageConversionException @@ -11,12 +10,7 @@ import org.gotson.komga.domain.persistence.BookRepository import org.gotson.komga.domain.service.BookAnalyzer import org.gotson.komga.infrastructure.image.ImageConverter import org.gotson.komga.infrastructure.image.ImageType -import org.springframework.scheduling.annotation.Async -import org.springframework.scheduling.annotation.AsyncResult import org.springframework.stereotype.Service -import org.springframework.transaction.annotation.Transactional -import java.util.concurrent.Future -import kotlin.system.measureTimeMillis private val logger = KotlinLogging.logger {} @@ -27,34 +21,26 @@ class BookLifecycle( private val imageConverter: ImageConverter ) { - @Transactional - @Async("analyzeBookTaskExecutor") - fun analyzeAndPersist(book: Book): Future { + fun analyzeAndPersist(book: Book) { logger.info { "Analyze and persist book: $book" } - return AsyncResult(measureTimeMillis { - try { - book.media = bookAnalyzer.analyze(book) - } catch (ex: Exception) { - logger.error(ex) { "Error while analyzing book: $book" } - book.media = Media(status = Media.Status.ERROR, comment = ex.message) - } - bookRepository.save(book) - }.also { logger.info { "Parsing finished in ${DurationFormatUtils.formatDurationHMS(it)}" } }) + try { + book.media = bookAnalyzer.analyze(book) + } catch (ex: Exception) { + logger.error(ex) { "Error while analyzing book: $book" } + book.media = Media(status = Media.Status.ERROR, comment = ex.message) + } + bookRepository.save(book) } - @Transactional - @Async("analyzeBookTaskExecutor") - fun regenerateThumbnailAndPersist(book: Book): Future { + fun regenerateThumbnailAndPersist(book: Book) { logger.info { "Regenerate thumbnail and persist book: $book" } - return AsyncResult(measureTimeMillis { - try { - book.media = bookAnalyzer.regenerateThumbnail(book) - } catch (ex: Exception) { - logger.error(ex) { "Error while recreating thumbnail" } - book.media = Media(status = Media.Status.ERROR) - } - bookRepository.save(book) - }.also { logger.info { "Thumbnail generated in ${DurationFormatUtils.formatDurationHMS(it)}" } }) + try { + book.media = bookAnalyzer.regenerateThumbnail(book) + } catch (ex: Exception) { + logger.error(ex) { "Error while recreating thumbnail" } + book.media = Media(status = Media.Status.ERROR) + } + bookRepository.save(book) } @Throws( diff --git a/komga/src/main/kotlin/org/gotson/komga/application/service/LibraryLifecycle.kt b/komga/src/main/kotlin/org/gotson/komga/application/service/LibraryLifecycle.kt index 9e24a40ba..19f1f7c4b 100644 --- a/komga/src/main/kotlin/org/gotson/komga/application/service/LibraryLifecycle.kt +++ b/komga/src/main/kotlin/org/gotson/komga/application/service/LibraryLifecycle.kt @@ -1,6 +1,7 @@ package org.gotson.komga.application.service import mu.KotlinLogging +import org.gotson.komga.application.tasks.TaskReceiver import org.gotson.komga.domain.model.DirectoryNotFoundException import org.gotson.komga.domain.model.DuplicateNameException import org.gotson.komga.domain.model.Library @@ -12,7 +13,6 @@ import org.springframework.stereotype.Service import org.springframework.transaction.annotation.Transactional import java.io.FileNotFoundException import java.nio.file.Files -import java.util.concurrent.RejectedExecutionException private val logger = KotlinLogging.logger {} @@ -21,7 +21,7 @@ class LibraryLifecycle( private val libraryRepository: LibraryRepository, private val seriesRepository: SeriesRepository, private val userRepository: KomgaUserRepository, - private val asyncOrchestrator: AsyncOrchestrator + private val taskReceiver: TaskReceiver ) { @Throws( @@ -49,15 +49,8 @@ class LibraryLifecycle( throw PathContainedInPath("Library path ${library.path()} is a parent of existing library ${it.name}: ${it.path()}") } - libraryRepository.save(library) - - logger.info { "Trying to launch a scan for the newly added library: ${library.name}" } - try { - asyncOrchestrator.scanAndAnalyzeAllLibraries() - } catch (e: RejectedExecutionException) { - logger.warn { "Another scan is already running, skipping" } - } + taskReceiver.scanLibrary(library) return library } diff --git a/komga/src/main/kotlin/org/gotson/komga/application/service/MetadataLifecycle.kt b/komga/src/main/kotlin/org/gotson/komga/application/service/MetadataLifecycle.kt index e2ae333c2..83029da32 100644 --- a/komga/src/main/kotlin/org/gotson/komga/application/service/MetadataLifecycle.kt +++ b/komga/src/main/kotlin/org/gotson/komga/application/service/MetadataLifecycle.kt @@ -6,41 +6,28 @@ import org.gotson.komga.domain.persistence.BookRepository import org.gotson.komga.domain.persistence.SeriesRepository import org.gotson.komga.domain.service.MetadataApplier import org.gotson.komga.infrastructure.metadata.BookMetadataProvider -import org.gotson.komga.infrastructure.metadata.comicinfo.ComicInfoProvider -import org.springframework.data.repository.findByIdOrNull -import org.springframework.scheduling.annotation.Async import org.springframework.stereotype.Service -import org.springframework.transaction.annotation.Transactional private val logger = KotlinLogging.logger {} @Service class MetadataLifecycle( - private val comicInfoProvider: ComicInfoProvider, private val bookMetadataProviders: List, private val metadataApplier: MetadataApplier, private val bookRepository: BookRepository, private val seriesRepository: SeriesRepository ) { - @Transactional - @Async("refreshMetadataTaskExecutor") fun refreshMetadata(book: Book) { logger.info { "Refresh metadata for book: $book" } - val loadedBook = bookRepository.findByIdOrNull(book.id) + bookMetadataProviders.forEach { + it.getBookMetadataFromBook(book)?.let { bPatch -> + metadataApplier.apply(bPatch, book) + bookRepository.save(book) - loadedBook?.let { bookToPatch -> - bookMetadataProviders.forEach { - val patch = it.getBookMetadataFromBook(bookToPatch) - - patch?.let { bPatch -> - metadataApplier.apply(bPatch, bookToPatch) - bookRepository.save(bookToPatch) - - bPatch.series?.let { sPatch -> - metadataApplier.apply(sPatch, bookToPatch.series) - seriesRepository.save(bookToPatch.series) - } + bPatch.series?.let { sPatch -> + metadataApplier.apply(sPatch, book.series) + seriesRepository.save(book.series) } } } diff --git a/komga/src/main/kotlin/org/gotson/komga/application/tasks/Task.kt b/komga/src/main/kotlin/org/gotson/komga/application/tasks/Task.kt new file mode 100644 index 000000000..5ffa7eaf0 --- /dev/null +++ b/komga/src/main/kotlin/org/gotson/komga/application/tasks/Task.kt @@ -0,0 +1,23 @@ +package org.gotson.komga.application.tasks + +import java.io.Serializable + +sealed class Task : Serializable { + abstract fun uniqueId(): String + + data class ScanLibrary(val libraryId: Long) : Task() { + override fun uniqueId() = "SCAN_LIBRARY_$libraryId" + } + + data class AnalyzeBook(val bookId: Long) : Task() { + override fun uniqueId() = "ANALYZE_BOOK_$bookId" + } + + data class GenerateBookThumbnail(val bookId: Long) : Task() { + override fun uniqueId() = "GENERATE_BOOK_THUMBNAIL_$bookId" + } + + data class RefreshBookMetadata(val bookId: Long) : Task() { + override fun uniqueId() = "REFRESH_BOOK_METADATA_$bookId" + } +} diff --git a/komga/src/main/kotlin/org/gotson/komga/application/tasks/TaskHandler.kt b/komga/src/main/kotlin/org/gotson/komga/application/tasks/TaskHandler.kt new file mode 100644 index 000000000..a1ef871bd --- /dev/null +++ b/komga/src/main/kotlin/org/gotson/komga/application/tasks/TaskHandler.kt @@ -0,0 +1,67 @@ +package org.gotson.komga.application.tasks + +import mu.KotlinLogging +import org.gotson.komga.application.service.BookLifecycle +import org.gotson.komga.application.service.MetadataLifecycle +import org.gotson.komga.domain.persistence.BookRepository +import org.gotson.komga.domain.persistence.LibraryRepository +import org.gotson.komga.domain.service.LibraryScanner +import org.gotson.komga.infrastructure.jms.QUEUE_TASKS +import org.gotson.komga.infrastructure.jms.QUEUE_TASKS_SELECTOR +import org.springframework.data.repository.findByIdOrNull +import org.springframework.jms.annotation.JmsListener +import org.springframework.stereotype.Service +import org.springframework.transaction.annotation.Transactional +import kotlin.time.measureTime + +private val logger = KotlinLogging.logger {} + +@Service +class TaskHandler( + private val taskReceiver: TaskReceiver, + private val libraryRepository: LibraryRepository, + private val bookRepository: BookRepository, + private val libraryScanner: LibraryScanner, + private val bookLifecycle: BookLifecycle, + private val metadataLifecycle: MetadataLifecycle +) { + + @JmsListener(destination = QUEUE_TASKS, selector = QUEUE_TASKS_SELECTOR) + @Transactional + fun handleTask(task: Task) { + logger.info { "Executing task: $task" } + try { + measureTime { + when (task) { + is Task.ScanLibrary -> + libraryRepository.findByIdOrNull(task.libraryId)?.let { + libraryScanner.scanRootFolder(it) + taskReceiver.analyzeUnknownBooks(it) + } ?: logger.warn { "Cannot execute task $task: Library does not exist" } + + is Task.AnalyzeBook -> + bookRepository.findByIdOrNull(task.bookId)?.let { + bookLifecycle.analyzeAndPersist(it) + taskReceiver.refreshBookMetadata(it) + } ?: logger.warn { "Cannot execute task $task: Book does not exist" } + + is Task.GenerateBookThumbnail -> + bookRepository.findByIdOrNull(task.bookId)?.let { + bookLifecycle.regenerateThumbnailAndPersist(it) + } ?: logger.warn { "Cannot execute task $task: Book does not exist" } + + is Task.RefreshBookMetadata -> + bookRepository.findByIdOrNull(task.bookId)?.let { + metadataLifecycle.refreshMetadata(it) + } ?: logger.warn { "Cannot execute task $task: Book does not exist" } + } + }.also { + logger.info { "Task $task executed in $it" } + } + } catch (e: Exception) { + logger.error(e) { "Task $task execution failed" } + } + } +} + + diff --git a/komga/src/main/kotlin/org/gotson/komga/application/tasks/TaskReceiver.kt b/komga/src/main/kotlin/org/gotson/komga/application/tasks/TaskReceiver.kt new file mode 100644 index 000000000..c108cce0d --- /dev/null +++ b/komga/src/main/kotlin/org/gotson/komga/application/tasks/TaskReceiver.kt @@ -0,0 +1,60 @@ +package org.gotson.komga.application.tasks + +import mu.KotlinLogging +import org.gotson.komga.domain.model.Book +import org.gotson.komga.domain.model.Library +import org.gotson.komga.domain.model.Media +import org.gotson.komga.domain.persistence.BookRepository +import org.gotson.komga.domain.persistence.LibraryRepository +import org.gotson.komga.infrastructure.jms.QUEUE_TASKS +import org.gotson.komga.infrastructure.jms.QUEUE_TASKS_TYPE +import org.gotson.komga.infrastructure.jms.QUEUE_TYPE +import org.gotson.komga.infrastructure.jms.QUEUE_UNIQUE_ID +import org.springframework.jms.core.JmsTemplate +import org.springframework.stereotype.Service + +private val logger = KotlinLogging.logger {} + +@Service +class TaskReceiver( + private val jmsTemplate: JmsTemplate, + private val libraryRepository: LibraryRepository, + private val bookRepository: BookRepository +) { + + fun scanLibraries() { + libraryRepository.findAll().forEach { scanLibrary(it) } + } + + fun scanLibrary(library: Library) { + submitTask(Task.ScanLibrary(library.id)) + } + + fun analyzeUnknownBooks(library: Library) { + bookRepository.findAllByMediaStatusAndSeriesLibrary(Media.Status.UNKNOWN, library).forEach { + submitTask(Task.AnalyzeBook(it.id)) + } + } + + fun analyzeBook(book: Book) { + submitTask(Task.AnalyzeBook(book.id)) + } + + fun generateBookThumbnail(book: Book) { + submitTask(Task.GenerateBookThumbnail(book.id)) + } + + fun refreshBookMetadata(book: Book) { + submitTask(Task.RefreshBookMetadata(book.id)) + } + + private fun submitTask(task: Task) { + logger.info { "Sending task: $task" } + jmsTemplate.convertAndSend(QUEUE_TASKS, task) { + it.apply { + setStringProperty(QUEUE_TYPE, QUEUE_TASKS_TYPE) + setStringProperty(QUEUE_UNIQUE_ID, task.uniqueId()) + } + } + } +} diff --git a/komga/src/main/kotlin/org/gotson/komga/domain/persistence/BookRepository.kt b/komga/src/main/kotlin/org/gotson/komga/domain/persistence/BookRepository.kt index a2727d9f8..fcf5c7671 100644 --- a/komga/src/main/kotlin/org/gotson/komga/domain/persistence/BookRepository.kt +++ b/komga/src/main/kotlin/org/gotson/komga/domain/persistence/BookRepository.kt @@ -28,8 +28,9 @@ interface BookRepository : JpaRepository, JpaSpecificationExecutor, pageable: Pageable): Page fun findBySeriesLibraryIn(seriesLibrary: Collection): List + fun findBySeriesLibrary(seriesLibrary: Library): List fun findByUrl(url: URL): Book? - fun findAllByMediaStatus(status: Media.Status): List + fun findAllByMediaStatusAndSeriesLibrary(status: Media.Status, library: Library): List fun findAllByMediaThumbnailIsNull(): List } diff --git a/komga/src/main/kotlin/org/gotson/komga/domain/service/LibraryScanner.kt b/komga/src/main/kotlin/org/gotson/komga/domain/service/LibraryScanner.kt index 4bc040f8b..4161fcc6d 100644 --- a/komga/src/main/kotlin/org/gotson/komga/domain/service/LibraryScanner.kt +++ b/komga/src/main/kotlin/org/gotson/komga/domain/service/LibraryScanner.kt @@ -1,18 +1,13 @@ package org.gotson.komga.domain.service import mu.KotlinLogging -import org.apache.commons.lang3.time.DurationFormatUtils -import org.gotson.komga.application.service.BookLifecycle -import org.gotson.komga.application.service.MetadataLifecycle import org.gotson.komga.domain.model.Library -import org.gotson.komga.domain.model.Media import org.gotson.komga.domain.persistence.BookRepository import org.gotson.komga.domain.persistence.SeriesRepository import org.springframework.stereotype.Service import org.springframework.transaction.annotation.Transactional import java.nio.file.Paths import java.time.temporal.ChronoUnit -import kotlin.system.measureTimeMillis private val logger = KotlinLogging.logger {} @@ -20,86 +15,57 @@ private val logger = KotlinLogging.logger {} class LibraryScanner( private val fileSystemScanner: FileSystemScanner, private val seriesRepository: SeriesRepository, - private val bookRepository: BookRepository, - private val bookLifecycle: BookLifecycle, - private val metadataLifecycle: MetadataLifecycle + private val bookRepository: BookRepository ) { @Transactional fun scanRootFolder(library: Library) { logger.info { "Updating library: ${library.name}, root folder: ${library.root}" } - measureTimeMillis { - val scannedSeries = fileSystemScanner.scanRootFolder(Paths.get(library.root.toURI())) + val scannedSeries = fileSystemScanner.scanRootFolder(Paths.get(library.root.toURI())) - // delete series that don't exist anymore - if (scannedSeries.isEmpty()) { - logger.info { "Scan returned no series, deleting all existing series" } - seriesRepository.deleteByLibraryId(library.id) + // delete series that don't exist anymore + if (scannedSeries.isEmpty()) { + logger.info { "Scan returned no series, deleting all existing series" } + seriesRepository.deleteByLibraryId(library.id) + } else { + scannedSeries.map { it.url }.let { urls -> + seriesRepository.findByLibraryIdAndUrlNotIn(library.id, urls).forEach { + logger.info { "Deleting series not on disk anymore: $it" } + seriesRepository.delete(it) + } + } + } + + scannedSeries.forEach { newSeries -> + val existingSeries = seriesRepository.findByLibraryIdAndUrl(library.id, newSeries.url) + + // if series does not exist, save it + if (existingSeries == null) { + logger.info { "Adding new series: $newSeries" } + seriesRepository.save(newSeries.also { it.library = library }) } else { - scannedSeries.map { it.url }.let { urls -> - seriesRepository.findByLibraryIdAndUrlNotIn(library.id, urls).forEach { - logger.info { "Deleting series not on disk anymore: $it" } - seriesRepository.delete(it) - } + // if series already exists, update it + if (newSeries.fileLastModified.truncatedTo(ChronoUnit.MILLIS) != existingSeries.fileLastModified.truncatedTo(ChronoUnit.MILLIS)) { + logger.info { "Series changed on disk, updating: $newSeries" } + existingSeries.fileLastModified = newSeries.fileLastModified + + // update list of books with existing entities if they exist + existingSeries.books = newSeries.books.map { newBook -> + val existingBook = bookRepository.findByUrl(newBook.url) ?: newBook + + if (newBook.fileLastModified.truncatedTo(ChronoUnit.MILLIS) != existingBook.fileLastModified.truncatedTo(ChronoUnit.MILLIS)) { + logger.info { "Book changed on disk, update and reset media status: $newBook" } + existingBook.fileLastModified = newBook.fileLastModified + existingBook.fileSize = newBook.fileSize + existingBook.media.reset() + } + existingBook + }.toMutableList() + + seriesRepository.save(existingSeries) } } - - scannedSeries.forEach { newSeries -> - val existingSeries = seriesRepository.findByLibraryIdAndUrl(library.id, newSeries.url) - - // if series does not exist, save it - if (existingSeries == null) { - logger.info { "Adding new series: $newSeries" } - seriesRepository.save(newSeries.also { it.library = library }) - } else { - // if series already exists, update it - if (newSeries.fileLastModified.truncatedTo(ChronoUnit.MILLIS) != existingSeries.fileLastModified.truncatedTo(ChronoUnit.MILLIS)) { - logger.info { "Series changed on disk, updating: $newSeries" } - existingSeries.fileLastModified = newSeries.fileLastModified - - // update list of books with existing entities if they exist - existingSeries.books = newSeries.books.map { newBook -> - val existingBook = bookRepository.findByUrl(newBook.url) ?: newBook - - if (newBook.fileLastModified.truncatedTo(ChronoUnit.MILLIS) != existingBook.fileLastModified.truncatedTo(ChronoUnit.MILLIS)) { - logger.info { "Book changed on disk, update and reset media status: $newBook" } - existingBook.fileLastModified = newBook.fileLastModified - existingBook.fileSize = newBook.fileSize - existingBook.media.reset() - } - existingBook - }.toMutableList() - - seriesRepository.save(existingSeries) - } - } - } - }.also { logger.info { "Library update finished in ${DurationFormatUtils.formatDurationHMS(it)}" } } - } - - fun analyzeUnknownBooks() { - logger.info { "Analyze all books in status: unknown" } - val booksToAnalyze = bookRepository.findAllByMediaStatus(Media.Status.UNKNOWN) - - var sumOfTasksTime = 0L - measureTimeMillis { - sumOfTasksTime = booksToAnalyze - .map { bookLifecycle.analyzeAndPersist(it) } - .map { - try { - it.get() - } catch (ex: Exception) { - 0L - } - } - .sum() - }.also { - logger.info { "Analyzed ${booksToAnalyze.size} books in ${DurationFormatUtils.formatDurationHMS(it)} (virtual: ${DurationFormatUtils.formatDurationHMS(sumOfTasksTime)})" } - } - - logger.info { "Refresh metadata for all books analyzed" } - booksToAnalyze.forEach { - metadataLifecycle.refreshMetadata(it) } } + } diff --git a/komga/src/main/kotlin/org/gotson/komga/infrastructure/async/AsyncConfiguration.kt b/komga/src/main/kotlin/org/gotson/komga/infrastructure/async/AsyncConfiguration.kt deleted file mode 100644 index d34c907db..000000000 --- a/komga/src/main/kotlin/org/gotson/komga/infrastructure/async/AsyncConfiguration.kt +++ /dev/null @@ -1,55 +0,0 @@ -package org.gotson.komga.infrastructure.async - -import org.gotson.komga.infrastructure.configuration.KomgaProperties -import org.springframework.context.annotation.Bean -import org.springframework.context.annotation.Configuration -import org.springframework.scheduling.annotation.EnableAsync -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor -import java.util.concurrent.Executor - -@Configuration -@EnableAsync -class AsyncConfiguration( - private val komgaProperties: KomgaProperties -) { - - @Bean("analyzeBookTaskExecutor") - fun analyzeBookTaskExecutor(): Executor = - ThreadPoolTaskExecutor().apply { - corePoolSize = komgaProperties.threads.analyzer - } - - @Bean("periodicScanTaskExecutor") - fun periodicScanTaskExecutor(): Executor = - ThreadPoolTaskExecutor().apply { - corePoolSize = 1 - maxPoolSize = 1 - setQueueCapacity(0) - } - - @Bean("regenerateThumbnailsTaskExecutor") - fun regenerateThumbnailsTaskExecutor(): Executor = - ThreadPoolTaskExecutor().apply { - corePoolSize = 1 - maxPoolSize = 1 - setQueueCapacity(0) - } - - @Bean("reAnalyzeBooksTaskExecutor") - fun reAnalyzeBooksTaskExecutor(): Executor = - ThreadPoolTaskExecutor().apply { - corePoolSize = 1 - } - - @Bean("refreshMetadataTaskExecutor") - fun refreshMetadataTaskExecutor(): Executor = - ThreadPoolTaskExecutor().apply { - corePoolSize = 1 - } - - @Bean("reRefreshMetadataTaskExecutor") - fun reRefreshMetadataTaskExecutor(): Executor = - ThreadPoolTaskExecutor().apply { - corePoolSize = 1 - } -} diff --git a/komga/src/main/kotlin/org/gotson/komga/infrastructure/jms/ArtemisConfig.kt b/komga/src/main/kotlin/org/gotson/komga/infrastructure/jms/ArtemisConfig.kt new file mode 100644 index 000000000..b910c23e7 --- /dev/null +++ b/komga/src/main/kotlin/org/gotson/komga/infrastructure/jms/ArtemisConfig.kt @@ -0,0 +1,32 @@ +package org.gotson.komga.infrastructure.jms + +import org.apache.activemq.artemis.api.core.RoutingType +import org.apache.activemq.artemis.core.config.CoreQueueConfiguration +import org.apache.activemq.artemis.core.settings.impl.AddressSettings +import org.springframework.boot.autoconfigure.jms.artemis.ArtemisConfigurationCustomizer +import org.springframework.context.annotation.Configuration + +const val QUEUE_UNIQUE_ID = "unique_id" +const val QUEUE_TYPE = "type" +const val QUEUE_TASKS = "tasks.background" +const val QUEUE_TASKS_TYPE = "task" +const val QUEUE_TASKS_SELECTOR = "$QUEUE_TYPE = '$QUEUE_TASKS_TYPE'" + +@Configuration +class ArtemisConfig : ArtemisConfigurationCustomizer { + override fun customize(configuration: org.apache.activemq.artemis.core.config.Configuration?) { + configuration?.let { + // disable prefetch, ensures messages stay in the queue and last value can have desired effect + it.addAddressesSetting(QUEUE_TASKS, AddressSettings().apply { + defaultConsumerWindowSize = 0 + }) + it.addQueueConfiguration( + CoreQueueConfiguration() + .setAddress(QUEUE_TASKS) + .setName(QUEUE_TASKS) + .setLastValueKey(QUEUE_UNIQUE_ID) + .setRoutingType(RoutingType.ANYCAST) + ) + } + } +} diff --git a/komga/src/main/kotlin/org/gotson/komga/interfaces/rest/AdminController.kt b/komga/src/main/kotlin/org/gotson/komga/interfaces/rest/AdminController.kt index 0dfb9169a..a18ae43d1 100644 --- a/komga/src/main/kotlin/org/gotson/komga/interfaces/rest/AdminController.kt +++ b/komga/src/main/kotlin/org/gotson/komga/interfaces/rest/AdminController.kt @@ -1,7 +1,7 @@ package org.gotson.komga.interfaces.rest import mu.KotlinLogging -import org.gotson.komga.application.service.AsyncOrchestrator +import org.gotson.komga.application.tasks.TaskReceiver import org.gotson.komga.domain.persistence.BookRepository import org.springframework.http.HttpStatus import org.springframework.security.access.prepost.PreAuthorize @@ -9,8 +9,6 @@ import org.springframework.web.bind.annotation.PostMapping import org.springframework.web.bind.annotation.RequestMapping import org.springframework.web.bind.annotation.ResponseStatus import org.springframework.web.bind.annotation.RestController -import org.springframework.web.server.ResponseStatusException -import java.util.concurrent.RejectedExecutionException private val logger = KotlinLogging.logger {} @@ -18,29 +16,21 @@ private val logger = KotlinLogging.logger {} @RequestMapping("api/v1/admin") @PreAuthorize("hasRole('ADMIN')") class AdminController( - private val asyncOrchestrator: AsyncOrchestrator, - private val bookRepository: BookRepository + private val bookRepository: BookRepository, + private val taskReceiver: TaskReceiver ) { @PostMapping("rpc/thumbnails/regenerate/all") @ResponseStatus(HttpStatus.ACCEPTED) fun regenerateAllThumbnails() { - try { - logger.info { "Regenerate thumbnail for all books" } - asyncOrchestrator.generateThumbnails(bookRepository.findAll()) - } catch (e: RejectedExecutionException) { - throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Thumbnail regeneration task is already running") - } + logger.info { "Regenerate thumbnail for all books" } + bookRepository.findAll().forEach { taskReceiver.generateBookThumbnail(it) } } @PostMapping("rpc/thumbnails/regenerate/missing") @ResponseStatus(HttpStatus.ACCEPTED) fun regenerateMissingThumbnails() { - try { - logger.info { "Regenerate missing thumbnails" } - asyncOrchestrator.generateThumbnails(bookRepository.findAllByMediaThumbnailIsNull()) - } catch (e: RejectedExecutionException) { - throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Thumbnail regeneration task is already running") - } + logger.info { "Regenerate missing thumbnails" } + bookRepository.findAllByMediaThumbnailIsNull().forEach { taskReceiver.generateBookThumbnail(it) } } } diff --git a/komga/src/main/kotlin/org/gotson/komga/interfaces/rest/BookController.kt b/komga/src/main/kotlin/org/gotson/komga/interfaces/rest/BookController.kt index 0de84a146..3b023a411 100644 --- a/komga/src/main/kotlin/org/gotson/komga/interfaces/rest/BookController.kt +++ b/komga/src/main/kotlin/org/gotson/komga/interfaces/rest/BookController.kt @@ -9,8 +9,8 @@ import io.swagger.v3.oas.annotations.media.Content import io.swagger.v3.oas.annotations.media.Schema import io.swagger.v3.oas.annotations.responses.ApiResponse import mu.KotlinLogging -import org.gotson.komga.application.service.AsyncOrchestrator import org.gotson.komga.application.service.BookLifecycle +import org.gotson.komga.application.tasks.TaskReceiver import org.gotson.komga.domain.model.Author import org.gotson.komga.domain.model.Book import org.gotson.komga.domain.model.BookMetadata @@ -57,7 +57,6 @@ import org.springframework.web.server.ResponseStatusException import java.io.FileNotFoundException import java.nio.file.NoSuchFileException import java.time.ZoneOffset -import java.util.concurrent.RejectedExecutionException import java.util.concurrent.TimeUnit import javax.persistence.criteria.JoinType import javax.validation.Valid @@ -69,7 +68,7 @@ private val logger = KotlinLogging.logger {} class BookController( private val bookRepository: BookRepository, private val bookLifecycle: BookLifecycle, - private val asyncOrchestrator: AsyncOrchestrator + private val taskReceiver: TaskReceiver ) { @PageableAsQueryParam @@ -345,11 +344,7 @@ class BookController( @ResponseStatus(HttpStatus.ACCEPTED) fun analyze(@PathVariable bookId: Long) { bookRepository.findByIdOrNull(bookId)?.let { book -> - try { - asyncOrchestrator.reAnalyzeBooks(listOf(book)) - } catch (e: RejectedExecutionException) { - throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Another book analysis task is already running") - } + taskReceiver.analyzeBook(book) } ?: throw ResponseStatusException(HttpStatus.NOT_FOUND) } @@ -358,11 +353,7 @@ class BookController( @ResponseStatus(HttpStatus.ACCEPTED) fun refreshMetadata(@PathVariable bookId: Long) { bookRepository.findByIdOrNull(bookId)?.let { book -> - try { - asyncOrchestrator.refreshBooksMetadata(listOf(book)) - } catch (e: RejectedExecutionException) { - throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Another metadata refresh task is already running") - } + taskReceiver.refreshBookMetadata(book) } ?: throw ResponseStatusException(HttpStatus.NOT_FOUND) } diff --git a/komga/src/main/kotlin/org/gotson/komga/interfaces/rest/LibraryController.kt b/komga/src/main/kotlin/org/gotson/komga/interfaces/rest/LibraryController.kt index e689232b0..b3e376eef 100644 --- a/komga/src/main/kotlin/org/gotson/komga/interfaces/rest/LibraryController.kt +++ b/komga/src/main/kotlin/org/gotson/komga/interfaces/rest/LibraryController.kt @@ -1,8 +1,8 @@ package org.gotson.komga.interfaces.rest import mu.KotlinLogging -import org.gotson.komga.application.service.AsyncOrchestrator import org.gotson.komga.application.service.LibraryLifecycle +import org.gotson.komga.application.tasks.TaskReceiver import org.gotson.komga.domain.model.DirectoryNotFoundException import org.gotson.komga.domain.model.DuplicateNameException import org.gotson.komga.domain.model.Library @@ -25,7 +25,6 @@ import org.springframework.web.bind.annotation.ResponseStatus import org.springframework.web.bind.annotation.RestController import org.springframework.web.server.ResponseStatusException import java.io.FileNotFoundException -import java.util.concurrent.RejectedExecutionException import javax.validation.Valid import javax.validation.constraints.NotBlank @@ -37,7 +36,7 @@ class LibraryController( private val libraryLifecycle: LibraryLifecycle, private val libraryRepository: LibraryRepository, private val bookRepository: BookRepository, - private val asyncOrchestrator: AsyncOrchestrator + private val taskReceiver: TaskReceiver ) { @GetMapping @@ -93,11 +92,7 @@ class LibraryController( @ResponseStatus(HttpStatus.ACCEPTED) fun scan(@PathVariable libraryId: Long) { libraryRepository.findByIdOrNull(libraryId)?.let { library -> - try { - asyncOrchestrator.scanAndAnalyzeOneLibrary(library) - } catch (e: RejectedExecutionException) { - throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Another scan task is already running") - } + taskReceiver.scanLibrary(library) } ?: throw ResponseStatusException(HttpStatus.NOT_FOUND) } @@ -106,11 +101,7 @@ class LibraryController( @ResponseStatus(HttpStatus.ACCEPTED) fun analyze(@PathVariable libraryId: Long) { libraryRepository.findByIdOrNull(libraryId)?.let { library -> - try { - asyncOrchestrator.reAnalyzeBooks(bookRepository.findBySeriesLibraryIn(listOf(library))) - } catch (e: RejectedExecutionException) { - throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Another book analysis task is already running") - } + bookRepository.findBySeriesLibrary(library).forEach { taskReceiver.analyzeBook(it) } } ?: throw ResponseStatusException(HttpStatus.NOT_FOUND) } @@ -119,11 +110,7 @@ class LibraryController( @ResponseStatus(HttpStatus.ACCEPTED) fun refreshMetadata(@PathVariable libraryId: Long) { libraryRepository.findByIdOrNull(libraryId)?.let { library -> - try { - asyncOrchestrator.refreshBooksMetadata(bookRepository.findBySeriesLibraryIn(listOf(library))) - } catch (e: RejectedExecutionException) { - throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Another metadata refresh task is already running") - } + bookRepository.findBySeriesLibrary(library).forEach { taskReceiver.refreshBookMetadata(it) } } ?: throw ResponseStatusException(HttpStatus.NOT_FOUND) } } diff --git a/komga/src/main/kotlin/org/gotson/komga/interfaces/rest/SeriesController.kt b/komga/src/main/kotlin/org/gotson/komga/interfaces/rest/SeriesController.kt index 399de64f6..97e7585a0 100644 --- a/komga/src/main/kotlin/org/gotson/komga/interfaces/rest/SeriesController.kt +++ b/komga/src/main/kotlin/org/gotson/komga/interfaces/rest/SeriesController.kt @@ -9,7 +9,7 @@ import io.swagger.v3.oas.annotations.media.Content import io.swagger.v3.oas.annotations.media.Schema import io.swagger.v3.oas.annotations.responses.ApiResponse import mu.KotlinLogging -import org.gotson.komga.application.service.AsyncOrchestrator +import org.gotson.komga.application.tasks.TaskReceiver import org.gotson.komga.domain.model.Library import org.gotson.komga.domain.model.Media import org.gotson.komga.domain.model.Series @@ -45,7 +45,6 @@ import org.springframework.web.bind.annotation.RequestParam import org.springframework.web.bind.annotation.ResponseStatus import org.springframework.web.bind.annotation.RestController import org.springframework.web.server.ResponseStatusException -import java.util.concurrent.RejectedExecutionException import javax.validation.Valid private val logger = KotlinLogging.logger {} @@ -56,7 +55,7 @@ class SeriesController( private val seriesRepository: SeriesRepository, private val bookRepository: BookRepository, private val bookController: BookController, - private val asyncOrchestrator: AsyncOrchestrator + private val taskReceiver: TaskReceiver ) { @GetMapping @@ -222,11 +221,7 @@ class SeriesController( @ResponseStatus(HttpStatus.ACCEPTED) fun analyze(@PathVariable seriesId: Long) { seriesRepository.findByIdOrNull(seriesId)?.let { series -> - try { - asyncOrchestrator.reAnalyzeBooks(series.books) - } catch (e: RejectedExecutionException) { - throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Another book analysis task is already running") - } + series.books.forEach { taskReceiver.analyzeBook(it) } } ?: throw ResponseStatusException(HttpStatus.NOT_FOUND) } @@ -235,11 +230,7 @@ class SeriesController( @ResponseStatus(HttpStatus.ACCEPTED) fun refreshMetadata(@PathVariable seriesId: Long) { seriesRepository.findByIdOrNull(seriesId)?.let { series -> - try { - asyncOrchestrator.refreshBooksMetadata(series.books) - } catch (e: RejectedExecutionException) { - throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Another metadata refresh task is already running") - } + series.books.forEach { taskReceiver.refreshBookMetadata(it) } } ?: throw ResponseStatusException(HttpStatus.NOT_FOUND) } diff --git a/komga/src/main/kotlin/org/gotson/komga/interfaces/scheduler/PeriodicScannerController.kt b/komga/src/main/kotlin/org/gotson/komga/interfaces/scheduler/PeriodicScannerController.kt index ff598196c..5c0c6f263 100644 --- a/komga/src/main/kotlin/org/gotson/komga/interfaces/scheduler/PeriodicScannerController.kt +++ b/komga/src/main/kotlin/org/gotson/komga/interfaces/scheduler/PeriodicScannerController.kt @@ -1,29 +1,24 @@ package org.gotson.komga.interfaces.scheduler import mu.KotlinLogging -import org.gotson.komga.application.service.AsyncOrchestrator +import org.gotson.komga.application.tasks.TaskReceiver import org.springframework.boot.context.event.ApplicationReadyEvent import org.springframework.context.annotation.Profile import org.springframework.context.event.EventListener import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Controller -import java.util.concurrent.RejectedExecutionException private val logger = KotlinLogging.logger {} @Profile("!test") @Controller class PeriodicScannerController( - private val asyncOrchestrator: AsyncOrchestrator + private val taskReceiver: TaskReceiver ) { @EventListener(ApplicationReadyEvent::class) @Scheduled(cron = "#{@komgaProperties.librariesScanCron ?: '-'}") - fun scanRootFolder() { - try { - asyncOrchestrator.scanAndAnalyzeAllLibraries() - } catch (e: RejectedExecutionException) { - logger.warn { "Another scan is already running, skipping" } - } + fun scanAllLibraries() { + taskReceiver.scanLibraries() } } diff --git a/komga/src/main/resources/application-dev.yml b/komga/src/main/resources/application-dev.yml index 11abc3adb..6ecbcad82 100644 --- a/komga/src/main/resources/application-dev.yml +++ b/komga/src/main/resources/application-dev.yml @@ -21,14 +21,15 @@ logging: max-history: 1 name: komga-dev.log level: - org.gotson.komga: DEBUG - web: DEBUG - # org.hibernate: - # stat: DEBUG - # SQL: DEBUG - # cache: DEBUG - # type.descriptor.sql.BasicBinder: TRACE - # org.springframework.security.web.FilterChainProxy: DEBUG + org.apache.activemq.audit.message: WARN +# web: DEBUG +# org.gotson.komga: DEBUG +# org.springframework.jms: DEBUG +# org.springframework.security.web.FilterChainProxy: DEBUG +# org.hibernate.stat: DEBUG +# org.hibernate.SQL: DEBUG +# org.hibernate.cache: DEBUG +# org.hibernate.type.descriptor.sql.BasicBinder: TRACE management.metrics.export.influx: # enabled: true diff --git a/komga/src/main/resources/application.yml b/komga/src/main/resources/application.yml index 2fce27448..815d22159 100644 --- a/komga/src/main/resources/application.yml +++ b/komga/src/main/resources/application.yml @@ -4,6 +4,8 @@ logging: file: max-history: 10 name: komga.log + level: + org.apache.activemq.audit.message: WARN komga: libraries-scan-cron: "0 */15 * * * ?" diff --git a/komga/src/test/kotlin/org/gotson/komga/domain/service/LibraryScannerTest.kt b/komga/src/test/kotlin/org/gotson/komga/domain/service/LibraryScannerTest.kt index 651dc0280..2575846e4 100644 --- a/komga/src/test/kotlin/org/gotson/komga/domain/service/LibraryScannerTest.kt +++ b/komga/src/test/kotlin/org/gotson/komga/domain/service/LibraryScannerTest.kt @@ -196,7 +196,7 @@ class LibraryScannerTest( libraryScanner.scanRootFolder(library) every { mockAnalyzer.analyze(any()) } returns Media(status = Media.Status.READY, mediaType = "application/zip", pages = mutableListOf(makeBookPage("1.jpg"), makeBookPage("2.jpg"))) - bookRepository.findAll().map { bookLifecycle.analyzeAndPersist(it) }.map { it.get() } + bookRepository.findAll().map { bookLifecycle.analyzeAndPersist(it) } // when libraryScanner.scanRootFolder(library) diff --git a/komga/src/test/kotlin/org/gotson/komga/infrastructure/jms/ArtemisConfigTest.kt b/komga/src/test/kotlin/org/gotson/komga/infrastructure/jms/ArtemisConfigTest.kt new file mode 100644 index 000000000..5bacd3771 --- /dev/null +++ b/komga/src/test/kotlin/org/gotson/komga/infrastructure/jms/ArtemisConfigTest.kt @@ -0,0 +1,95 @@ +package org.gotson.komga.infrastructure.jms + +import mu.KotlinLogging +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.jms.core.JmsTemplate +import org.springframework.jms.support.destination.JmsDestinationAccessor +import org.springframework.test.context.junit.jupiter.SpringExtension +import javax.jms.QueueBrowser +import javax.jms.Session + +private val logger = KotlinLogging.logger {} + +@ExtendWith(SpringExtension::class) +@SpringBootTest +class ArtemisConfigTest( + @Autowired private val jmsTemplate: JmsTemplate +) { + + init { + jmsTemplate.receiveTimeout = JmsDestinationAccessor.RECEIVE_TIMEOUT_NO_WAIT + } + + @BeforeEach + fun emptyQueue() { + while (jmsTemplate.receive(QUEUE_TASKS) != null) { + logger.info { "Emptying queue" } + } + } + + @Test + fun `when sending messages with the same unique id then messages are deduplicated`() { + for (i in 1..5) { + jmsTemplate.convertAndSend( + QUEUE_TASKS, + "message $i" + ) { + it.apply { setStringProperty(QUEUE_UNIQUE_ID, "1") } + } + } + + val size = jmsTemplate.browse(QUEUE_TASKS) { _: Session, browser: QueueBrowser -> + browser.enumeration.toList().size + } + + val msg = jmsTemplate.receiveAndConvert(QUEUE_TASKS) as String + + assertThat(msg).isEqualTo("message 5") + assertThat(size).isEqualTo(1) + } + + @Test + fun `when sending messages with some common unique id then messages are deduplicated`() { + for (i in 1..6) { + jmsTemplate.convertAndSend( + QUEUE_TASKS, + "message $i" + ) { + it.apply { setStringProperty(QUEUE_UNIQUE_ID, i.rem(2).toString()) } + } + } + + val size = jmsTemplate.browse(QUEUE_TASKS) { _: Session, browser: QueueBrowser -> + browser.enumeration.toList().size + } + + val msg = jmsTemplate.receiveAndConvert(QUEUE_TASKS) as String + + assertThat(msg).isEqualTo("message 5") + assertThat(size).isEqualTo(2) + } + + @Test + fun `when sending messages without unique id then messages are not deduplicated`() { + for (i in 1..5) { + jmsTemplate.convertAndSend( + QUEUE_TASKS, + "message $i" + ) + } + + val size = jmsTemplate.browse(QUEUE_TASKS) { _: Session, browser: QueueBrowser -> + browser.enumeration.toList().size + } + + val msg = jmsTemplate.receiveAndConvert(QUEUE_TASKS) as String + + assertThat(msg).isEqualTo("message 1") + assertThat(size).isEqualTo(5) + } +}