From 60ce87a25dd62a2d357ce3b62a03f03bbce43f1c Mon Sep 17 00:00:00 2001 From: Gauthier Roebroeck Date: Fri, 24 Apr 2020 15:02:09 +0800 Subject: [PATCH] feat(tasks): replace background tasks management Use of Apache Artemis message queue instead async methods with executors. Tasks are submitted to TaskReceiver, and handled by TaskHandler. The Artemis queue is configured with last-value, which automatically remove duplicate tasks in the queue. --- komga/build.gradle.kts | 6 +- .../application/service/AsyncOrchestrator.kt | 89 ------------- .../application/service/BookLifecycle.kt | 46 +++---- .../application/service/LibraryLifecycle.kt | 13 +- .../application/service/MetadataLifecycle.kt | 27 ++-- .../gotson/komga/application/tasks/Task.kt | 23 ++++ .../komga/application/tasks/TaskHandler.kt | 67 ++++++++++ .../komga/application/tasks/TaskReceiver.kt | 60 +++++++++ .../domain/persistence/BookRepository.kt | 3 +- .../komga/domain/service/LibraryScanner.kt | 118 +++++++----------- .../async/AsyncConfiguration.kt | 55 -------- .../komga/infrastructure/jms/ArtemisConfig.kt | 32 +++++ .../komga/interfaces/rest/AdminController.kt | 24 ++-- .../komga/interfaces/rest/BookController.kt | 17 +-- .../interfaces/rest/LibraryController.kt | 23 +--- .../komga/interfaces/rest/SeriesController.kt | 17 +-- .../scheduler/PeriodicScannerController.kt | 13 +- komga/src/main/resources/application-dev.yml | 17 +-- komga/src/main/resources/application.yml | 2 + .../domain/service/LibraryScannerTest.kt | 2 +- .../infrastructure/jms/ArtemisConfigTest.kt | 95 ++++++++++++++ 21 files changed, 388 insertions(+), 361 deletions(-) delete mode 100644 komga/src/main/kotlin/org/gotson/komga/application/service/AsyncOrchestrator.kt create mode 100644 komga/src/main/kotlin/org/gotson/komga/application/tasks/Task.kt create mode 100644 komga/src/main/kotlin/org/gotson/komga/application/tasks/TaskHandler.kt create mode 100644 komga/src/main/kotlin/org/gotson/komga/application/tasks/TaskReceiver.kt delete mode 100644 komga/src/main/kotlin/org/gotson/komga/infrastructure/async/AsyncConfiguration.kt create mode 100644 komga/src/main/kotlin/org/gotson/komga/infrastructure/jms/ArtemisConfig.kt create mode 100644 komga/src/test/kotlin/org/gotson/komga/infrastructure/jms/ArtemisConfigTest.kt diff --git a/komga/build.gradle.kts b/komga/build.gradle.kts index 3f7ad0c49..84c7d63ff 100644 --- a/komga/build.gradle.kts +++ b/komga/build.gradle.kts @@ -38,9 +38,12 @@ dependencies { implementation("org.springframework.boot:spring-boot-starter-cache") implementation("org.springframework.boot:spring-boot-starter-security") implementation("org.springframework.boot:spring-boot-starter-thymeleaf") + implementation("org.springframework.boot:spring-boot-starter-artemis") kapt("org.springframework.boot:spring-boot-configuration-processor") + implementation("org.apache.activemq:artemis-jms-server") + implementation("org.flywaydb:flyway-core") implementation("org.hibernate:hibernate-jcache") @@ -93,6 +96,7 @@ dependencies { testImplementation("com.tngtech.archunit:archunit-junit5:0.13.1") + developmentOnly("org.springframework.boot:spring-boot-devtools") } @@ -100,7 +104,7 @@ tasks { withType { kotlinOptions { jvmTarget = "1.8" - freeCompilerArgs = listOf("-Xjsr305=strict") + freeCompilerArgs = listOf("-Xjsr305=strict", "-Xopt-in=kotlin.time.ExperimentalTime") } } diff --git a/komga/src/main/kotlin/org/gotson/komga/application/service/AsyncOrchestrator.kt b/komga/src/main/kotlin/org/gotson/komga/application/service/AsyncOrchestrator.kt deleted file mode 100644 index f6add9207..000000000 --- a/komga/src/main/kotlin/org/gotson/komga/application/service/AsyncOrchestrator.kt +++ /dev/null @@ -1,89 +0,0 @@ -package org.gotson.komga.application.service - -import mu.KotlinLogging -import org.apache.commons.lang3.time.DurationFormatUtils -import org.gotson.komga.domain.model.Book -import org.gotson.komga.domain.model.Library -import org.gotson.komga.domain.persistence.BookRepository -import org.gotson.komga.domain.persistence.LibraryRepository -import org.gotson.komga.domain.persistence.SeriesRepository -import org.gotson.komga.domain.service.LibraryScanner -import org.springframework.scheduling.annotation.Async -import org.springframework.stereotype.Service -import org.springframework.transaction.annotation.Transactional -import kotlin.system.measureTimeMillis - -private val logger = KotlinLogging.logger {} - -@Service -class AsyncOrchestrator( - private val libraryScanner: LibraryScanner, - private val libraryRepository: LibraryRepository, - private val bookRepository: BookRepository, - private val bookLifecycle: BookLifecycle, - private val seriesRepository: SeriesRepository, - private val metadataLifecycle: MetadataLifecycle -) { - - @Async("periodicScanTaskExecutor") - fun scanAndAnalyzeAllLibraries() { - logger.info { "Starting periodic libraries scan" } - val libraries = libraryRepository.findAll() - - if (libraries.isEmpty()) { - logger.info { "No libraries defined, nothing to scan" } - } else { - libraries.forEach { - libraryScanner.scanRootFolder(it) - } - - logger.info { "Starting periodic book parsing" } - libraryScanner.analyzeUnknownBooks() - } - } - - @Async("periodicScanTaskExecutor") - fun scanAndAnalyzeOneLibrary(library: Library) { - libraryScanner.scanRootFolder(library) - libraryScanner.analyzeUnknownBooks() - } - - @Async("regenerateThumbnailsTaskExecutor") - @Transactional - fun generateThumbnails(books: List) { - val loadedBooks = bookRepository.findAllById(books.map { it.id }) - var sumOfTasksTime = 0L - measureTimeMillis { - sumOfTasksTime = loadedBooks - .map { bookLifecycle.regenerateThumbnailAndPersist(it) } - .map { - try { - it.get() - } catch (ex: Exception) { - 0L - } - } - .sum() - }.also { - logger.info { "Generated ${loadedBooks.size} thumbnails in ${DurationFormatUtils.formatDurationHMS(it)} (virtual: ${DurationFormatUtils.formatDurationHMS(sumOfTasksTime)})" } - } - } - - @Async("reAnalyzeBooksTaskExecutor") - @Transactional - fun reAnalyzeBooks(books: List) { - val loadedBooks = bookRepository.findAllById(books.map { it.id }) - loadedBooks.forEach { it.media.reset() } - bookRepository.saveAll(loadedBooks) - - loadedBooks.map { bookLifecycle.analyzeAndPersist(it) } - } - - @Async("reRefreshMetadataTaskExecutor") - @Transactional - fun refreshBooksMetadata(books: List) { - bookRepository - .findAllById(books.map { it.id }) - .forEach { metadataLifecycle.refreshMetadata(it) } - } -} diff --git a/komga/src/main/kotlin/org/gotson/komga/application/service/BookLifecycle.kt b/komga/src/main/kotlin/org/gotson/komga/application/service/BookLifecycle.kt index ed1f4468c..9657006ec 100644 --- a/komga/src/main/kotlin/org/gotson/komga/application/service/BookLifecycle.kt +++ b/komga/src/main/kotlin/org/gotson/komga/application/service/BookLifecycle.kt @@ -1,7 +1,6 @@ package org.gotson.komga.application.service import mu.KotlinLogging -import org.apache.commons.lang3.time.DurationFormatUtils import org.gotson.komga.domain.model.Book import org.gotson.komga.domain.model.BookPageContent import org.gotson.komga.domain.model.ImageConversionException @@ -11,12 +10,7 @@ import org.gotson.komga.domain.persistence.BookRepository import org.gotson.komga.domain.service.BookAnalyzer import org.gotson.komga.infrastructure.image.ImageConverter import org.gotson.komga.infrastructure.image.ImageType -import org.springframework.scheduling.annotation.Async -import org.springframework.scheduling.annotation.AsyncResult import org.springframework.stereotype.Service -import org.springframework.transaction.annotation.Transactional -import java.util.concurrent.Future -import kotlin.system.measureTimeMillis private val logger = KotlinLogging.logger {} @@ -27,34 +21,26 @@ class BookLifecycle( private val imageConverter: ImageConverter ) { - @Transactional - @Async("analyzeBookTaskExecutor") - fun analyzeAndPersist(book: Book): Future { + fun analyzeAndPersist(book: Book) { logger.info { "Analyze and persist book: $book" } - return AsyncResult(measureTimeMillis { - try { - book.media = bookAnalyzer.analyze(book) - } catch (ex: Exception) { - logger.error(ex) { "Error while analyzing book: $book" } - book.media = Media(status = Media.Status.ERROR, comment = ex.message) - } - bookRepository.save(book) - }.also { logger.info { "Parsing finished in ${DurationFormatUtils.formatDurationHMS(it)}" } }) + try { + book.media = bookAnalyzer.analyze(book) + } catch (ex: Exception) { + logger.error(ex) { "Error while analyzing book: $book" } + book.media = Media(status = Media.Status.ERROR, comment = ex.message) + } + bookRepository.save(book) } - @Transactional - @Async("analyzeBookTaskExecutor") - fun regenerateThumbnailAndPersist(book: Book): Future { + fun regenerateThumbnailAndPersist(book: Book) { logger.info { "Regenerate thumbnail and persist book: $book" } - return AsyncResult(measureTimeMillis { - try { - book.media = bookAnalyzer.regenerateThumbnail(book) - } catch (ex: Exception) { - logger.error(ex) { "Error while recreating thumbnail" } - book.media = Media(status = Media.Status.ERROR) - } - bookRepository.save(book) - }.also { logger.info { "Thumbnail generated in ${DurationFormatUtils.formatDurationHMS(it)}" } }) + try { + book.media = bookAnalyzer.regenerateThumbnail(book) + } catch (ex: Exception) { + logger.error(ex) { "Error while recreating thumbnail" } + book.media = Media(status = Media.Status.ERROR) + } + bookRepository.save(book) } @Throws( diff --git a/komga/src/main/kotlin/org/gotson/komga/application/service/LibraryLifecycle.kt b/komga/src/main/kotlin/org/gotson/komga/application/service/LibraryLifecycle.kt index 9e24a40ba..19f1f7c4b 100644 --- a/komga/src/main/kotlin/org/gotson/komga/application/service/LibraryLifecycle.kt +++ b/komga/src/main/kotlin/org/gotson/komga/application/service/LibraryLifecycle.kt @@ -1,6 +1,7 @@ package org.gotson.komga.application.service import mu.KotlinLogging +import org.gotson.komga.application.tasks.TaskReceiver import org.gotson.komga.domain.model.DirectoryNotFoundException import org.gotson.komga.domain.model.DuplicateNameException import org.gotson.komga.domain.model.Library @@ -12,7 +13,6 @@ import org.springframework.stereotype.Service import org.springframework.transaction.annotation.Transactional import java.io.FileNotFoundException import java.nio.file.Files -import java.util.concurrent.RejectedExecutionException private val logger = KotlinLogging.logger {} @@ -21,7 +21,7 @@ class LibraryLifecycle( private val libraryRepository: LibraryRepository, private val seriesRepository: SeriesRepository, private val userRepository: KomgaUserRepository, - private val asyncOrchestrator: AsyncOrchestrator + private val taskReceiver: TaskReceiver ) { @Throws( @@ -49,15 +49,8 @@ class LibraryLifecycle( throw PathContainedInPath("Library path ${library.path()} is a parent of existing library ${it.name}: ${it.path()}") } - libraryRepository.save(library) - - logger.info { "Trying to launch a scan for the newly added library: ${library.name}" } - try { - asyncOrchestrator.scanAndAnalyzeAllLibraries() - } catch (e: RejectedExecutionException) { - logger.warn { "Another scan is already running, skipping" } - } + taskReceiver.scanLibrary(library) return library } diff --git a/komga/src/main/kotlin/org/gotson/komga/application/service/MetadataLifecycle.kt b/komga/src/main/kotlin/org/gotson/komga/application/service/MetadataLifecycle.kt index e2ae333c2..83029da32 100644 --- a/komga/src/main/kotlin/org/gotson/komga/application/service/MetadataLifecycle.kt +++ b/komga/src/main/kotlin/org/gotson/komga/application/service/MetadataLifecycle.kt @@ -6,41 +6,28 @@ import org.gotson.komga.domain.persistence.BookRepository import org.gotson.komga.domain.persistence.SeriesRepository import org.gotson.komga.domain.service.MetadataApplier import org.gotson.komga.infrastructure.metadata.BookMetadataProvider -import org.gotson.komga.infrastructure.metadata.comicinfo.ComicInfoProvider -import org.springframework.data.repository.findByIdOrNull -import org.springframework.scheduling.annotation.Async import org.springframework.stereotype.Service -import org.springframework.transaction.annotation.Transactional private val logger = KotlinLogging.logger {} @Service class MetadataLifecycle( - private val comicInfoProvider: ComicInfoProvider, private val bookMetadataProviders: List, private val metadataApplier: MetadataApplier, private val bookRepository: BookRepository, private val seriesRepository: SeriesRepository ) { - @Transactional - @Async("refreshMetadataTaskExecutor") fun refreshMetadata(book: Book) { logger.info { "Refresh metadata for book: $book" } - val loadedBook = bookRepository.findByIdOrNull(book.id) + bookMetadataProviders.forEach { + it.getBookMetadataFromBook(book)?.let { bPatch -> + metadataApplier.apply(bPatch, book) + bookRepository.save(book) - loadedBook?.let { bookToPatch -> - bookMetadataProviders.forEach { - val patch = it.getBookMetadataFromBook(bookToPatch) - - patch?.let { bPatch -> - metadataApplier.apply(bPatch, bookToPatch) - bookRepository.save(bookToPatch) - - bPatch.series?.let { sPatch -> - metadataApplier.apply(sPatch, bookToPatch.series) - seriesRepository.save(bookToPatch.series) - } + bPatch.series?.let { sPatch -> + metadataApplier.apply(sPatch, book.series) + seriesRepository.save(book.series) } } } diff --git a/komga/src/main/kotlin/org/gotson/komga/application/tasks/Task.kt b/komga/src/main/kotlin/org/gotson/komga/application/tasks/Task.kt new file mode 100644 index 000000000..5ffa7eaf0 --- /dev/null +++ b/komga/src/main/kotlin/org/gotson/komga/application/tasks/Task.kt @@ -0,0 +1,23 @@ +package org.gotson.komga.application.tasks + +import java.io.Serializable + +sealed class Task : Serializable { + abstract fun uniqueId(): String + + data class ScanLibrary(val libraryId: Long) : Task() { + override fun uniqueId() = "SCAN_LIBRARY_$libraryId" + } + + data class AnalyzeBook(val bookId: Long) : Task() { + override fun uniqueId() = "ANALYZE_BOOK_$bookId" + } + + data class GenerateBookThumbnail(val bookId: Long) : Task() { + override fun uniqueId() = "GENERATE_BOOK_THUMBNAIL_$bookId" + } + + data class RefreshBookMetadata(val bookId: Long) : Task() { + override fun uniqueId() = "REFRESH_BOOK_METADATA_$bookId" + } +} diff --git a/komga/src/main/kotlin/org/gotson/komga/application/tasks/TaskHandler.kt b/komga/src/main/kotlin/org/gotson/komga/application/tasks/TaskHandler.kt new file mode 100644 index 000000000..a1ef871bd --- /dev/null +++ b/komga/src/main/kotlin/org/gotson/komga/application/tasks/TaskHandler.kt @@ -0,0 +1,67 @@ +package org.gotson.komga.application.tasks + +import mu.KotlinLogging +import org.gotson.komga.application.service.BookLifecycle +import org.gotson.komga.application.service.MetadataLifecycle +import org.gotson.komga.domain.persistence.BookRepository +import org.gotson.komga.domain.persistence.LibraryRepository +import org.gotson.komga.domain.service.LibraryScanner +import org.gotson.komga.infrastructure.jms.QUEUE_TASKS +import org.gotson.komga.infrastructure.jms.QUEUE_TASKS_SELECTOR +import org.springframework.data.repository.findByIdOrNull +import org.springframework.jms.annotation.JmsListener +import org.springframework.stereotype.Service +import org.springframework.transaction.annotation.Transactional +import kotlin.time.measureTime + +private val logger = KotlinLogging.logger {} + +@Service +class TaskHandler( + private val taskReceiver: TaskReceiver, + private val libraryRepository: LibraryRepository, + private val bookRepository: BookRepository, + private val libraryScanner: LibraryScanner, + private val bookLifecycle: BookLifecycle, + private val metadataLifecycle: MetadataLifecycle +) { + + @JmsListener(destination = QUEUE_TASKS, selector = QUEUE_TASKS_SELECTOR) + @Transactional + fun handleTask(task: Task) { + logger.info { "Executing task: $task" } + try { + measureTime { + when (task) { + is Task.ScanLibrary -> + libraryRepository.findByIdOrNull(task.libraryId)?.let { + libraryScanner.scanRootFolder(it) + taskReceiver.analyzeUnknownBooks(it) + } ?: logger.warn { "Cannot execute task $task: Library does not exist" } + + is Task.AnalyzeBook -> + bookRepository.findByIdOrNull(task.bookId)?.let { + bookLifecycle.analyzeAndPersist(it) + taskReceiver.refreshBookMetadata(it) + } ?: logger.warn { "Cannot execute task $task: Book does not exist" } + + is Task.GenerateBookThumbnail -> + bookRepository.findByIdOrNull(task.bookId)?.let { + bookLifecycle.regenerateThumbnailAndPersist(it) + } ?: logger.warn { "Cannot execute task $task: Book does not exist" } + + is Task.RefreshBookMetadata -> + bookRepository.findByIdOrNull(task.bookId)?.let { + metadataLifecycle.refreshMetadata(it) + } ?: logger.warn { "Cannot execute task $task: Book does not exist" } + } + }.also { + logger.info { "Task $task executed in $it" } + } + } catch (e: Exception) { + logger.error(e) { "Task $task execution failed" } + } + } +} + + diff --git a/komga/src/main/kotlin/org/gotson/komga/application/tasks/TaskReceiver.kt b/komga/src/main/kotlin/org/gotson/komga/application/tasks/TaskReceiver.kt new file mode 100644 index 000000000..c108cce0d --- /dev/null +++ b/komga/src/main/kotlin/org/gotson/komga/application/tasks/TaskReceiver.kt @@ -0,0 +1,60 @@ +package org.gotson.komga.application.tasks + +import mu.KotlinLogging +import org.gotson.komga.domain.model.Book +import org.gotson.komga.domain.model.Library +import org.gotson.komga.domain.model.Media +import org.gotson.komga.domain.persistence.BookRepository +import org.gotson.komga.domain.persistence.LibraryRepository +import org.gotson.komga.infrastructure.jms.QUEUE_TASKS +import org.gotson.komga.infrastructure.jms.QUEUE_TASKS_TYPE +import org.gotson.komga.infrastructure.jms.QUEUE_TYPE +import org.gotson.komga.infrastructure.jms.QUEUE_UNIQUE_ID +import org.springframework.jms.core.JmsTemplate +import org.springframework.stereotype.Service + +private val logger = KotlinLogging.logger {} + +@Service +class TaskReceiver( + private val jmsTemplate: JmsTemplate, + private val libraryRepository: LibraryRepository, + private val bookRepository: BookRepository +) { + + fun scanLibraries() { + libraryRepository.findAll().forEach { scanLibrary(it) } + } + + fun scanLibrary(library: Library) { + submitTask(Task.ScanLibrary(library.id)) + } + + fun analyzeUnknownBooks(library: Library) { + bookRepository.findAllByMediaStatusAndSeriesLibrary(Media.Status.UNKNOWN, library).forEach { + submitTask(Task.AnalyzeBook(it.id)) + } + } + + fun analyzeBook(book: Book) { + submitTask(Task.AnalyzeBook(book.id)) + } + + fun generateBookThumbnail(book: Book) { + submitTask(Task.GenerateBookThumbnail(book.id)) + } + + fun refreshBookMetadata(book: Book) { + submitTask(Task.RefreshBookMetadata(book.id)) + } + + private fun submitTask(task: Task) { + logger.info { "Sending task: $task" } + jmsTemplate.convertAndSend(QUEUE_TASKS, task) { + it.apply { + setStringProperty(QUEUE_TYPE, QUEUE_TASKS_TYPE) + setStringProperty(QUEUE_UNIQUE_ID, task.uniqueId()) + } + } + } +} diff --git a/komga/src/main/kotlin/org/gotson/komga/domain/persistence/BookRepository.kt b/komga/src/main/kotlin/org/gotson/komga/domain/persistence/BookRepository.kt index a2727d9f8..fcf5c7671 100644 --- a/komga/src/main/kotlin/org/gotson/komga/domain/persistence/BookRepository.kt +++ b/komga/src/main/kotlin/org/gotson/komga/domain/persistence/BookRepository.kt @@ -28,8 +28,9 @@ interface BookRepository : JpaRepository, JpaSpecificationExecutor, pageable: Pageable): Page fun findBySeriesLibraryIn(seriesLibrary: Collection): List + fun findBySeriesLibrary(seriesLibrary: Library): List fun findByUrl(url: URL): Book? - fun findAllByMediaStatus(status: Media.Status): List + fun findAllByMediaStatusAndSeriesLibrary(status: Media.Status, library: Library): List fun findAllByMediaThumbnailIsNull(): List } diff --git a/komga/src/main/kotlin/org/gotson/komga/domain/service/LibraryScanner.kt b/komga/src/main/kotlin/org/gotson/komga/domain/service/LibraryScanner.kt index 4bc040f8b..4161fcc6d 100644 --- a/komga/src/main/kotlin/org/gotson/komga/domain/service/LibraryScanner.kt +++ b/komga/src/main/kotlin/org/gotson/komga/domain/service/LibraryScanner.kt @@ -1,18 +1,13 @@ package org.gotson.komga.domain.service import mu.KotlinLogging -import org.apache.commons.lang3.time.DurationFormatUtils -import org.gotson.komga.application.service.BookLifecycle -import org.gotson.komga.application.service.MetadataLifecycle import org.gotson.komga.domain.model.Library -import org.gotson.komga.domain.model.Media import org.gotson.komga.domain.persistence.BookRepository import org.gotson.komga.domain.persistence.SeriesRepository import org.springframework.stereotype.Service import org.springframework.transaction.annotation.Transactional import java.nio.file.Paths import java.time.temporal.ChronoUnit -import kotlin.system.measureTimeMillis private val logger = KotlinLogging.logger {} @@ -20,86 +15,57 @@ private val logger = KotlinLogging.logger {} class LibraryScanner( private val fileSystemScanner: FileSystemScanner, private val seriesRepository: SeriesRepository, - private val bookRepository: BookRepository, - private val bookLifecycle: BookLifecycle, - private val metadataLifecycle: MetadataLifecycle + private val bookRepository: BookRepository ) { @Transactional fun scanRootFolder(library: Library) { logger.info { "Updating library: ${library.name}, root folder: ${library.root}" } - measureTimeMillis { - val scannedSeries = fileSystemScanner.scanRootFolder(Paths.get(library.root.toURI())) + val scannedSeries = fileSystemScanner.scanRootFolder(Paths.get(library.root.toURI())) - // delete series that don't exist anymore - if (scannedSeries.isEmpty()) { - logger.info { "Scan returned no series, deleting all existing series" } - seriesRepository.deleteByLibraryId(library.id) + // delete series that don't exist anymore + if (scannedSeries.isEmpty()) { + logger.info { "Scan returned no series, deleting all existing series" } + seriesRepository.deleteByLibraryId(library.id) + } else { + scannedSeries.map { it.url }.let { urls -> + seriesRepository.findByLibraryIdAndUrlNotIn(library.id, urls).forEach { + logger.info { "Deleting series not on disk anymore: $it" } + seriesRepository.delete(it) + } + } + } + + scannedSeries.forEach { newSeries -> + val existingSeries = seriesRepository.findByLibraryIdAndUrl(library.id, newSeries.url) + + // if series does not exist, save it + if (existingSeries == null) { + logger.info { "Adding new series: $newSeries" } + seriesRepository.save(newSeries.also { it.library = library }) } else { - scannedSeries.map { it.url }.let { urls -> - seriesRepository.findByLibraryIdAndUrlNotIn(library.id, urls).forEach { - logger.info { "Deleting series not on disk anymore: $it" } - seriesRepository.delete(it) - } + // if series already exists, update it + if (newSeries.fileLastModified.truncatedTo(ChronoUnit.MILLIS) != existingSeries.fileLastModified.truncatedTo(ChronoUnit.MILLIS)) { + logger.info { "Series changed on disk, updating: $newSeries" } + existingSeries.fileLastModified = newSeries.fileLastModified + + // update list of books with existing entities if they exist + existingSeries.books = newSeries.books.map { newBook -> + val existingBook = bookRepository.findByUrl(newBook.url) ?: newBook + + if (newBook.fileLastModified.truncatedTo(ChronoUnit.MILLIS) != existingBook.fileLastModified.truncatedTo(ChronoUnit.MILLIS)) { + logger.info { "Book changed on disk, update and reset media status: $newBook" } + existingBook.fileLastModified = newBook.fileLastModified + existingBook.fileSize = newBook.fileSize + existingBook.media.reset() + } + existingBook + }.toMutableList() + + seriesRepository.save(existingSeries) } } - - scannedSeries.forEach { newSeries -> - val existingSeries = seriesRepository.findByLibraryIdAndUrl(library.id, newSeries.url) - - // if series does not exist, save it - if (existingSeries == null) { - logger.info { "Adding new series: $newSeries" } - seriesRepository.save(newSeries.also { it.library = library }) - } else { - // if series already exists, update it - if (newSeries.fileLastModified.truncatedTo(ChronoUnit.MILLIS) != existingSeries.fileLastModified.truncatedTo(ChronoUnit.MILLIS)) { - logger.info { "Series changed on disk, updating: $newSeries" } - existingSeries.fileLastModified = newSeries.fileLastModified - - // update list of books with existing entities if they exist - existingSeries.books = newSeries.books.map { newBook -> - val existingBook = bookRepository.findByUrl(newBook.url) ?: newBook - - if (newBook.fileLastModified.truncatedTo(ChronoUnit.MILLIS) != existingBook.fileLastModified.truncatedTo(ChronoUnit.MILLIS)) { - logger.info { "Book changed on disk, update and reset media status: $newBook" } - existingBook.fileLastModified = newBook.fileLastModified - existingBook.fileSize = newBook.fileSize - existingBook.media.reset() - } - existingBook - }.toMutableList() - - seriesRepository.save(existingSeries) - } - } - } - }.also { logger.info { "Library update finished in ${DurationFormatUtils.formatDurationHMS(it)}" } } - } - - fun analyzeUnknownBooks() { - logger.info { "Analyze all books in status: unknown" } - val booksToAnalyze = bookRepository.findAllByMediaStatus(Media.Status.UNKNOWN) - - var sumOfTasksTime = 0L - measureTimeMillis { - sumOfTasksTime = booksToAnalyze - .map { bookLifecycle.analyzeAndPersist(it) } - .map { - try { - it.get() - } catch (ex: Exception) { - 0L - } - } - .sum() - }.also { - logger.info { "Analyzed ${booksToAnalyze.size} books in ${DurationFormatUtils.formatDurationHMS(it)} (virtual: ${DurationFormatUtils.formatDurationHMS(sumOfTasksTime)})" } - } - - logger.info { "Refresh metadata for all books analyzed" } - booksToAnalyze.forEach { - metadataLifecycle.refreshMetadata(it) } } + } diff --git a/komga/src/main/kotlin/org/gotson/komga/infrastructure/async/AsyncConfiguration.kt b/komga/src/main/kotlin/org/gotson/komga/infrastructure/async/AsyncConfiguration.kt deleted file mode 100644 index d34c907db..000000000 --- a/komga/src/main/kotlin/org/gotson/komga/infrastructure/async/AsyncConfiguration.kt +++ /dev/null @@ -1,55 +0,0 @@ -package org.gotson.komga.infrastructure.async - -import org.gotson.komga.infrastructure.configuration.KomgaProperties -import org.springframework.context.annotation.Bean -import org.springframework.context.annotation.Configuration -import org.springframework.scheduling.annotation.EnableAsync -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor -import java.util.concurrent.Executor - -@Configuration -@EnableAsync -class AsyncConfiguration( - private val komgaProperties: KomgaProperties -) { - - @Bean("analyzeBookTaskExecutor") - fun analyzeBookTaskExecutor(): Executor = - ThreadPoolTaskExecutor().apply { - corePoolSize = komgaProperties.threads.analyzer - } - - @Bean("periodicScanTaskExecutor") - fun periodicScanTaskExecutor(): Executor = - ThreadPoolTaskExecutor().apply { - corePoolSize = 1 - maxPoolSize = 1 - setQueueCapacity(0) - } - - @Bean("regenerateThumbnailsTaskExecutor") - fun regenerateThumbnailsTaskExecutor(): Executor = - ThreadPoolTaskExecutor().apply { - corePoolSize = 1 - maxPoolSize = 1 - setQueueCapacity(0) - } - - @Bean("reAnalyzeBooksTaskExecutor") - fun reAnalyzeBooksTaskExecutor(): Executor = - ThreadPoolTaskExecutor().apply { - corePoolSize = 1 - } - - @Bean("refreshMetadataTaskExecutor") - fun refreshMetadataTaskExecutor(): Executor = - ThreadPoolTaskExecutor().apply { - corePoolSize = 1 - } - - @Bean("reRefreshMetadataTaskExecutor") - fun reRefreshMetadataTaskExecutor(): Executor = - ThreadPoolTaskExecutor().apply { - corePoolSize = 1 - } -} diff --git a/komga/src/main/kotlin/org/gotson/komga/infrastructure/jms/ArtemisConfig.kt b/komga/src/main/kotlin/org/gotson/komga/infrastructure/jms/ArtemisConfig.kt new file mode 100644 index 000000000..b910c23e7 --- /dev/null +++ b/komga/src/main/kotlin/org/gotson/komga/infrastructure/jms/ArtemisConfig.kt @@ -0,0 +1,32 @@ +package org.gotson.komga.infrastructure.jms + +import org.apache.activemq.artemis.api.core.RoutingType +import org.apache.activemq.artemis.core.config.CoreQueueConfiguration +import org.apache.activemq.artemis.core.settings.impl.AddressSettings +import org.springframework.boot.autoconfigure.jms.artemis.ArtemisConfigurationCustomizer +import org.springframework.context.annotation.Configuration + +const val QUEUE_UNIQUE_ID = "unique_id" +const val QUEUE_TYPE = "type" +const val QUEUE_TASKS = "tasks.background" +const val QUEUE_TASKS_TYPE = "task" +const val QUEUE_TASKS_SELECTOR = "$QUEUE_TYPE = '$QUEUE_TASKS_TYPE'" + +@Configuration +class ArtemisConfig : ArtemisConfigurationCustomizer { + override fun customize(configuration: org.apache.activemq.artemis.core.config.Configuration?) { + configuration?.let { + // disable prefetch, ensures messages stay in the queue and last value can have desired effect + it.addAddressesSetting(QUEUE_TASKS, AddressSettings().apply { + defaultConsumerWindowSize = 0 + }) + it.addQueueConfiguration( + CoreQueueConfiguration() + .setAddress(QUEUE_TASKS) + .setName(QUEUE_TASKS) + .setLastValueKey(QUEUE_UNIQUE_ID) + .setRoutingType(RoutingType.ANYCAST) + ) + } + } +} diff --git a/komga/src/main/kotlin/org/gotson/komga/interfaces/rest/AdminController.kt b/komga/src/main/kotlin/org/gotson/komga/interfaces/rest/AdminController.kt index 0dfb9169a..a18ae43d1 100644 --- a/komga/src/main/kotlin/org/gotson/komga/interfaces/rest/AdminController.kt +++ b/komga/src/main/kotlin/org/gotson/komga/interfaces/rest/AdminController.kt @@ -1,7 +1,7 @@ package org.gotson.komga.interfaces.rest import mu.KotlinLogging -import org.gotson.komga.application.service.AsyncOrchestrator +import org.gotson.komga.application.tasks.TaskReceiver import org.gotson.komga.domain.persistence.BookRepository import org.springframework.http.HttpStatus import org.springframework.security.access.prepost.PreAuthorize @@ -9,8 +9,6 @@ import org.springframework.web.bind.annotation.PostMapping import org.springframework.web.bind.annotation.RequestMapping import org.springframework.web.bind.annotation.ResponseStatus import org.springframework.web.bind.annotation.RestController -import org.springframework.web.server.ResponseStatusException -import java.util.concurrent.RejectedExecutionException private val logger = KotlinLogging.logger {} @@ -18,29 +16,21 @@ private val logger = KotlinLogging.logger {} @RequestMapping("api/v1/admin") @PreAuthorize("hasRole('ADMIN')") class AdminController( - private val asyncOrchestrator: AsyncOrchestrator, - private val bookRepository: BookRepository + private val bookRepository: BookRepository, + private val taskReceiver: TaskReceiver ) { @PostMapping("rpc/thumbnails/regenerate/all") @ResponseStatus(HttpStatus.ACCEPTED) fun regenerateAllThumbnails() { - try { - logger.info { "Regenerate thumbnail for all books" } - asyncOrchestrator.generateThumbnails(bookRepository.findAll()) - } catch (e: RejectedExecutionException) { - throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Thumbnail regeneration task is already running") - } + logger.info { "Regenerate thumbnail for all books" } + bookRepository.findAll().forEach { taskReceiver.generateBookThumbnail(it) } } @PostMapping("rpc/thumbnails/regenerate/missing") @ResponseStatus(HttpStatus.ACCEPTED) fun regenerateMissingThumbnails() { - try { - logger.info { "Regenerate missing thumbnails" } - asyncOrchestrator.generateThumbnails(bookRepository.findAllByMediaThumbnailIsNull()) - } catch (e: RejectedExecutionException) { - throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Thumbnail regeneration task is already running") - } + logger.info { "Regenerate missing thumbnails" } + bookRepository.findAllByMediaThumbnailIsNull().forEach { taskReceiver.generateBookThumbnail(it) } } } diff --git a/komga/src/main/kotlin/org/gotson/komga/interfaces/rest/BookController.kt b/komga/src/main/kotlin/org/gotson/komga/interfaces/rest/BookController.kt index 0de84a146..3b023a411 100644 --- a/komga/src/main/kotlin/org/gotson/komga/interfaces/rest/BookController.kt +++ b/komga/src/main/kotlin/org/gotson/komga/interfaces/rest/BookController.kt @@ -9,8 +9,8 @@ import io.swagger.v3.oas.annotations.media.Content import io.swagger.v3.oas.annotations.media.Schema import io.swagger.v3.oas.annotations.responses.ApiResponse import mu.KotlinLogging -import org.gotson.komga.application.service.AsyncOrchestrator import org.gotson.komga.application.service.BookLifecycle +import org.gotson.komga.application.tasks.TaskReceiver import org.gotson.komga.domain.model.Author import org.gotson.komga.domain.model.Book import org.gotson.komga.domain.model.BookMetadata @@ -57,7 +57,6 @@ import org.springframework.web.server.ResponseStatusException import java.io.FileNotFoundException import java.nio.file.NoSuchFileException import java.time.ZoneOffset -import java.util.concurrent.RejectedExecutionException import java.util.concurrent.TimeUnit import javax.persistence.criteria.JoinType import javax.validation.Valid @@ -69,7 +68,7 @@ private val logger = KotlinLogging.logger {} class BookController( private val bookRepository: BookRepository, private val bookLifecycle: BookLifecycle, - private val asyncOrchestrator: AsyncOrchestrator + private val taskReceiver: TaskReceiver ) { @PageableAsQueryParam @@ -345,11 +344,7 @@ class BookController( @ResponseStatus(HttpStatus.ACCEPTED) fun analyze(@PathVariable bookId: Long) { bookRepository.findByIdOrNull(bookId)?.let { book -> - try { - asyncOrchestrator.reAnalyzeBooks(listOf(book)) - } catch (e: RejectedExecutionException) { - throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Another book analysis task is already running") - } + taskReceiver.analyzeBook(book) } ?: throw ResponseStatusException(HttpStatus.NOT_FOUND) } @@ -358,11 +353,7 @@ class BookController( @ResponseStatus(HttpStatus.ACCEPTED) fun refreshMetadata(@PathVariable bookId: Long) { bookRepository.findByIdOrNull(bookId)?.let { book -> - try { - asyncOrchestrator.refreshBooksMetadata(listOf(book)) - } catch (e: RejectedExecutionException) { - throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Another metadata refresh task is already running") - } + taskReceiver.refreshBookMetadata(book) } ?: throw ResponseStatusException(HttpStatus.NOT_FOUND) } diff --git a/komga/src/main/kotlin/org/gotson/komga/interfaces/rest/LibraryController.kt b/komga/src/main/kotlin/org/gotson/komga/interfaces/rest/LibraryController.kt index e689232b0..b3e376eef 100644 --- a/komga/src/main/kotlin/org/gotson/komga/interfaces/rest/LibraryController.kt +++ b/komga/src/main/kotlin/org/gotson/komga/interfaces/rest/LibraryController.kt @@ -1,8 +1,8 @@ package org.gotson.komga.interfaces.rest import mu.KotlinLogging -import org.gotson.komga.application.service.AsyncOrchestrator import org.gotson.komga.application.service.LibraryLifecycle +import org.gotson.komga.application.tasks.TaskReceiver import org.gotson.komga.domain.model.DirectoryNotFoundException import org.gotson.komga.domain.model.DuplicateNameException import org.gotson.komga.domain.model.Library @@ -25,7 +25,6 @@ import org.springframework.web.bind.annotation.ResponseStatus import org.springframework.web.bind.annotation.RestController import org.springframework.web.server.ResponseStatusException import java.io.FileNotFoundException -import java.util.concurrent.RejectedExecutionException import javax.validation.Valid import javax.validation.constraints.NotBlank @@ -37,7 +36,7 @@ class LibraryController( private val libraryLifecycle: LibraryLifecycle, private val libraryRepository: LibraryRepository, private val bookRepository: BookRepository, - private val asyncOrchestrator: AsyncOrchestrator + private val taskReceiver: TaskReceiver ) { @GetMapping @@ -93,11 +92,7 @@ class LibraryController( @ResponseStatus(HttpStatus.ACCEPTED) fun scan(@PathVariable libraryId: Long) { libraryRepository.findByIdOrNull(libraryId)?.let { library -> - try { - asyncOrchestrator.scanAndAnalyzeOneLibrary(library) - } catch (e: RejectedExecutionException) { - throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Another scan task is already running") - } + taskReceiver.scanLibrary(library) } ?: throw ResponseStatusException(HttpStatus.NOT_FOUND) } @@ -106,11 +101,7 @@ class LibraryController( @ResponseStatus(HttpStatus.ACCEPTED) fun analyze(@PathVariable libraryId: Long) { libraryRepository.findByIdOrNull(libraryId)?.let { library -> - try { - asyncOrchestrator.reAnalyzeBooks(bookRepository.findBySeriesLibraryIn(listOf(library))) - } catch (e: RejectedExecutionException) { - throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Another book analysis task is already running") - } + bookRepository.findBySeriesLibrary(library).forEach { taskReceiver.analyzeBook(it) } } ?: throw ResponseStatusException(HttpStatus.NOT_FOUND) } @@ -119,11 +110,7 @@ class LibraryController( @ResponseStatus(HttpStatus.ACCEPTED) fun refreshMetadata(@PathVariable libraryId: Long) { libraryRepository.findByIdOrNull(libraryId)?.let { library -> - try { - asyncOrchestrator.refreshBooksMetadata(bookRepository.findBySeriesLibraryIn(listOf(library))) - } catch (e: RejectedExecutionException) { - throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Another metadata refresh task is already running") - } + bookRepository.findBySeriesLibrary(library).forEach { taskReceiver.refreshBookMetadata(it) } } ?: throw ResponseStatusException(HttpStatus.NOT_FOUND) } } diff --git a/komga/src/main/kotlin/org/gotson/komga/interfaces/rest/SeriesController.kt b/komga/src/main/kotlin/org/gotson/komga/interfaces/rest/SeriesController.kt index 399de64f6..97e7585a0 100644 --- a/komga/src/main/kotlin/org/gotson/komga/interfaces/rest/SeriesController.kt +++ b/komga/src/main/kotlin/org/gotson/komga/interfaces/rest/SeriesController.kt @@ -9,7 +9,7 @@ import io.swagger.v3.oas.annotations.media.Content import io.swagger.v3.oas.annotations.media.Schema import io.swagger.v3.oas.annotations.responses.ApiResponse import mu.KotlinLogging -import org.gotson.komga.application.service.AsyncOrchestrator +import org.gotson.komga.application.tasks.TaskReceiver import org.gotson.komga.domain.model.Library import org.gotson.komga.domain.model.Media import org.gotson.komga.domain.model.Series @@ -45,7 +45,6 @@ import org.springframework.web.bind.annotation.RequestParam import org.springframework.web.bind.annotation.ResponseStatus import org.springframework.web.bind.annotation.RestController import org.springframework.web.server.ResponseStatusException -import java.util.concurrent.RejectedExecutionException import javax.validation.Valid private val logger = KotlinLogging.logger {} @@ -56,7 +55,7 @@ class SeriesController( private val seriesRepository: SeriesRepository, private val bookRepository: BookRepository, private val bookController: BookController, - private val asyncOrchestrator: AsyncOrchestrator + private val taskReceiver: TaskReceiver ) { @GetMapping @@ -222,11 +221,7 @@ class SeriesController( @ResponseStatus(HttpStatus.ACCEPTED) fun analyze(@PathVariable seriesId: Long) { seriesRepository.findByIdOrNull(seriesId)?.let { series -> - try { - asyncOrchestrator.reAnalyzeBooks(series.books) - } catch (e: RejectedExecutionException) { - throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Another book analysis task is already running") - } + series.books.forEach { taskReceiver.analyzeBook(it) } } ?: throw ResponseStatusException(HttpStatus.NOT_FOUND) } @@ -235,11 +230,7 @@ class SeriesController( @ResponseStatus(HttpStatus.ACCEPTED) fun refreshMetadata(@PathVariable seriesId: Long) { seriesRepository.findByIdOrNull(seriesId)?.let { series -> - try { - asyncOrchestrator.refreshBooksMetadata(series.books) - } catch (e: RejectedExecutionException) { - throw ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Another metadata refresh task is already running") - } + series.books.forEach { taskReceiver.refreshBookMetadata(it) } } ?: throw ResponseStatusException(HttpStatus.NOT_FOUND) } diff --git a/komga/src/main/kotlin/org/gotson/komga/interfaces/scheduler/PeriodicScannerController.kt b/komga/src/main/kotlin/org/gotson/komga/interfaces/scheduler/PeriodicScannerController.kt index ff598196c..5c0c6f263 100644 --- a/komga/src/main/kotlin/org/gotson/komga/interfaces/scheduler/PeriodicScannerController.kt +++ b/komga/src/main/kotlin/org/gotson/komga/interfaces/scheduler/PeriodicScannerController.kt @@ -1,29 +1,24 @@ package org.gotson.komga.interfaces.scheduler import mu.KotlinLogging -import org.gotson.komga.application.service.AsyncOrchestrator +import org.gotson.komga.application.tasks.TaskReceiver import org.springframework.boot.context.event.ApplicationReadyEvent import org.springframework.context.annotation.Profile import org.springframework.context.event.EventListener import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Controller -import java.util.concurrent.RejectedExecutionException private val logger = KotlinLogging.logger {} @Profile("!test") @Controller class PeriodicScannerController( - private val asyncOrchestrator: AsyncOrchestrator + private val taskReceiver: TaskReceiver ) { @EventListener(ApplicationReadyEvent::class) @Scheduled(cron = "#{@komgaProperties.librariesScanCron ?: '-'}") - fun scanRootFolder() { - try { - asyncOrchestrator.scanAndAnalyzeAllLibraries() - } catch (e: RejectedExecutionException) { - logger.warn { "Another scan is already running, skipping" } - } + fun scanAllLibraries() { + taskReceiver.scanLibraries() } } diff --git a/komga/src/main/resources/application-dev.yml b/komga/src/main/resources/application-dev.yml index 11abc3adb..6ecbcad82 100644 --- a/komga/src/main/resources/application-dev.yml +++ b/komga/src/main/resources/application-dev.yml @@ -21,14 +21,15 @@ logging: max-history: 1 name: komga-dev.log level: - org.gotson.komga: DEBUG - web: DEBUG - # org.hibernate: - # stat: DEBUG - # SQL: DEBUG - # cache: DEBUG - # type.descriptor.sql.BasicBinder: TRACE - # org.springframework.security.web.FilterChainProxy: DEBUG + org.apache.activemq.audit.message: WARN +# web: DEBUG +# org.gotson.komga: DEBUG +# org.springframework.jms: DEBUG +# org.springframework.security.web.FilterChainProxy: DEBUG +# org.hibernate.stat: DEBUG +# org.hibernate.SQL: DEBUG +# org.hibernate.cache: DEBUG +# org.hibernate.type.descriptor.sql.BasicBinder: TRACE management.metrics.export.influx: # enabled: true diff --git a/komga/src/main/resources/application.yml b/komga/src/main/resources/application.yml index 2fce27448..815d22159 100644 --- a/komga/src/main/resources/application.yml +++ b/komga/src/main/resources/application.yml @@ -4,6 +4,8 @@ logging: file: max-history: 10 name: komga.log + level: + org.apache.activemq.audit.message: WARN komga: libraries-scan-cron: "0 */15 * * * ?" diff --git a/komga/src/test/kotlin/org/gotson/komga/domain/service/LibraryScannerTest.kt b/komga/src/test/kotlin/org/gotson/komga/domain/service/LibraryScannerTest.kt index 651dc0280..2575846e4 100644 --- a/komga/src/test/kotlin/org/gotson/komga/domain/service/LibraryScannerTest.kt +++ b/komga/src/test/kotlin/org/gotson/komga/domain/service/LibraryScannerTest.kt @@ -196,7 +196,7 @@ class LibraryScannerTest( libraryScanner.scanRootFolder(library) every { mockAnalyzer.analyze(any()) } returns Media(status = Media.Status.READY, mediaType = "application/zip", pages = mutableListOf(makeBookPage("1.jpg"), makeBookPage("2.jpg"))) - bookRepository.findAll().map { bookLifecycle.analyzeAndPersist(it) }.map { it.get() } + bookRepository.findAll().map { bookLifecycle.analyzeAndPersist(it) } // when libraryScanner.scanRootFolder(library) diff --git a/komga/src/test/kotlin/org/gotson/komga/infrastructure/jms/ArtemisConfigTest.kt b/komga/src/test/kotlin/org/gotson/komga/infrastructure/jms/ArtemisConfigTest.kt new file mode 100644 index 000000000..5bacd3771 --- /dev/null +++ b/komga/src/test/kotlin/org/gotson/komga/infrastructure/jms/ArtemisConfigTest.kt @@ -0,0 +1,95 @@ +package org.gotson.komga.infrastructure.jms + +import mu.KotlinLogging +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.jms.core.JmsTemplate +import org.springframework.jms.support.destination.JmsDestinationAccessor +import org.springframework.test.context.junit.jupiter.SpringExtension +import javax.jms.QueueBrowser +import javax.jms.Session + +private val logger = KotlinLogging.logger {} + +@ExtendWith(SpringExtension::class) +@SpringBootTest +class ArtemisConfigTest( + @Autowired private val jmsTemplate: JmsTemplate +) { + + init { + jmsTemplate.receiveTimeout = JmsDestinationAccessor.RECEIVE_TIMEOUT_NO_WAIT + } + + @BeforeEach + fun emptyQueue() { + while (jmsTemplate.receive(QUEUE_TASKS) != null) { + logger.info { "Emptying queue" } + } + } + + @Test + fun `when sending messages with the same unique id then messages are deduplicated`() { + for (i in 1..5) { + jmsTemplate.convertAndSend( + QUEUE_TASKS, + "message $i" + ) { + it.apply { setStringProperty(QUEUE_UNIQUE_ID, "1") } + } + } + + val size = jmsTemplate.browse(QUEUE_TASKS) { _: Session, browser: QueueBrowser -> + browser.enumeration.toList().size + } + + val msg = jmsTemplate.receiveAndConvert(QUEUE_TASKS) as String + + assertThat(msg).isEqualTo("message 5") + assertThat(size).isEqualTo(1) + } + + @Test + fun `when sending messages with some common unique id then messages are deduplicated`() { + for (i in 1..6) { + jmsTemplate.convertAndSend( + QUEUE_TASKS, + "message $i" + ) { + it.apply { setStringProperty(QUEUE_UNIQUE_ID, i.rem(2).toString()) } + } + } + + val size = jmsTemplate.browse(QUEUE_TASKS) { _: Session, browser: QueueBrowser -> + browser.enumeration.toList().size + } + + val msg = jmsTemplate.receiveAndConvert(QUEUE_TASKS) as String + + assertThat(msg).isEqualTo("message 5") + assertThat(size).isEqualTo(2) + } + + @Test + fun `when sending messages without unique id then messages are not deduplicated`() { + for (i in 1..5) { + jmsTemplate.convertAndSend( + QUEUE_TASKS, + "message $i" + ) + } + + val size = jmsTemplate.browse(QUEUE_TASKS) { _: Session, browser: QueueBrowser -> + browser.enumeration.toList().size + } + + val msg = jmsTemplate.receiveAndConvert(QUEUE_TASKS) as String + + assertThat(msg).isEqualTo("message 1") + assertThat(size).isEqualTo(5) + } +}