diff --git a/komga/src/main/kotlin/org/gotson/komga/application/events/EventPublisher.kt b/komga/src/main/kotlin/org/gotson/komga/application/events/EventPublisher.kt index e3b99961a..2b3ae8cae 100644 --- a/komga/src/main/kotlin/org/gotson/komga/application/events/EventPublisher.kt +++ b/komga/src/main/kotlin/org/gotson/komga/application/events/EventPublisher.kt @@ -1,9 +1,8 @@ package org.gotson.komga.application.events import org.gotson.komga.domain.model.DomainEvent -import org.gotson.komga.infrastructure.jms.QUEUE_SSE -import org.gotson.komga.infrastructure.jms.QUEUE_SSE_TYPE -import org.gotson.komga.infrastructure.jms.QUEUE_TYPE +import org.gotson.komga.infrastructure.jms.JMS_PROPERTY_TYPE +import org.gotson.komga.infrastructure.jms.TOPIC_EVENTS import org.springframework.jms.core.JmsTemplate import org.springframework.stereotype.Service import javax.jms.ConnectionFactory @@ -17,9 +16,9 @@ class EventPublisher( } fun publishEvent(event: DomainEvent) { - jmsTemplate.convertAndSend(QUEUE_SSE, event) { + jmsTemplate.convertAndSend(TOPIC_EVENTS, event) { it.apply { - setStringProperty(QUEUE_TYPE, QUEUE_SSE_TYPE) + setStringProperty(JMS_PROPERTY_TYPE, event.javaClass.simpleName) } } } diff --git a/komga/src/main/kotlin/org/gotson/komga/application/tasks/TaskHandler.kt b/komga/src/main/kotlin/org/gotson/komga/application/tasks/TaskHandler.kt index ef99fe8ac..549b5bc1d 100644 --- a/komga/src/main/kotlin/org/gotson/komga/application/tasks/TaskHandler.kt +++ b/komga/src/main/kotlin/org/gotson/komga/application/tasks/TaskHandler.kt @@ -15,7 +15,6 @@ import org.gotson.komga.domain.service.SeriesLifecycle import org.gotson.komga.domain.service.SeriesMetadataLifecycle import org.gotson.komga.infrastructure.jms.QUEUE_FACTORY import org.gotson.komga.infrastructure.jms.QUEUE_TASKS -import org.gotson.komga.infrastructure.jms.QUEUE_TASKS_SELECTOR import org.gotson.komga.infrastructure.search.SearchIndexLifecycle import org.springframework.jms.annotation.JmsListener import org.springframework.stereotype.Service @@ -42,7 +41,7 @@ class TaskHandler( private val searchIndexLifecycle: SearchIndexLifecycle, ) { - @JmsListener(destination = QUEUE_TASKS, selector = QUEUE_TASKS_SELECTOR, containerFactory = QUEUE_FACTORY, concurrency = "#{@komgaProperties.taskConsumers}-#{@komgaProperties.taskConsumersMax}") + @JmsListener(destination = QUEUE_TASKS, containerFactory = QUEUE_FACTORY, concurrency = "#{@komgaProperties.taskConsumers}-#{@komgaProperties.taskConsumersMax}") fun handleTask(task: Task) { logger.info { "Executing task: $task" } try { diff --git a/komga/src/main/kotlin/org/gotson/komga/application/tasks/TaskReceiver.kt b/komga/src/main/kotlin/org/gotson/komga/application/tasks/TaskReceiver.kt index 24f30fea2..95ae77c0e 100644 --- a/komga/src/main/kotlin/org/gotson/komga/application/tasks/TaskReceiver.kt +++ b/komga/src/main/kotlin/org/gotson/komga/application/tasks/TaskReceiver.kt @@ -12,10 +12,8 @@ import org.gotson.komga.domain.persistence.BookRepository import org.gotson.komga.domain.persistence.LibraryRepository import org.gotson.komga.domain.service.BookConverter import org.gotson.komga.domain.service.PageHashLifecycle -import org.gotson.komga.infrastructure.jms.QUEUE_SUB_TYPE +import org.gotson.komga.infrastructure.jms.JMS_PROPERTY_TYPE import org.gotson.komga.infrastructure.jms.QUEUE_TASKS -import org.gotson.komga.infrastructure.jms.QUEUE_TASKS_TYPE -import org.gotson.komga.infrastructure.jms.QUEUE_TYPE import org.gotson.komga.infrastructure.jms.QUEUE_UNIQUE_ID import org.gotson.komga.infrastructure.jooq.UnpagedSorted import org.gotson.komga.infrastructure.search.LuceneEntity @@ -156,9 +154,8 @@ class TaskReceiver( logger.info { "Sending task: $task" } jmsTemplates[task.priority]!!.convertAndSend(QUEUE_TASKS, task) { it.apply { - setStringProperty(QUEUE_TYPE, QUEUE_TASKS_TYPE) setStringProperty(QUEUE_UNIQUE_ID, task.uniqueId()) - setStringProperty(QUEUE_SUB_TYPE, task::class.simpleName) + setStringProperty(JMS_PROPERTY_TYPE, task.javaClass.simpleName) task.groupId?.let { groupId -> setStringProperty("JMSXGroupID", groupId) } } } diff --git a/komga/src/main/kotlin/org/gotson/komga/infrastructure/jms/ArtemisConfig.kt b/komga/src/main/kotlin/org/gotson/komga/infrastructure/jms/ArtemisConfig.kt index 23630b1bf..9f1d0b82f 100644 --- a/komga/src/main/kotlin/org/gotson/komga/infrastructure/jms/ArtemisConfig.kt +++ b/komga/src/main/kotlin/org/gotson/komga/infrastructure/jms/ArtemisConfig.kt @@ -15,16 +15,11 @@ import org.apache.activemq.artemis.core.config.Configuration as ArtemisConfigura private val logger = KotlinLogging.logger {} const val QUEUE_UNIQUE_ID = "unique_id" -const val QUEUE_TYPE = "type" -const val QUEUE_SUB_TYPE = "subtype" const val QUEUE_TASKS = "tasks.background" -const val QUEUE_TASKS_TYPE = "task" -const val QUEUE_TASKS_SELECTOR = "$QUEUE_TYPE = '$QUEUE_TASKS_TYPE'" +const val TOPIC_EVENTS = "domain.events" -const val QUEUE_SSE = "sse" -const val QUEUE_SSE_TYPE = "sse" -const val QUEUE_SSE_SELECTOR = "$QUEUE_TYPE = '$QUEUE_SSE_TYPE'" +const val JMS_PROPERTY_TYPE = "type" const val TOPIC_FACTORY = "topicJmsListenerContainerFactory" const val QUEUE_FACTORY = "queueJmsListenerContainerFactory" @@ -49,8 +44,8 @@ class ArtemisConfig : ArtemisConfigurationCustomizer { .setRoutingType(RoutingType.ANYCAST), ) it.addQueueConfiguration( - QueueConfiguration(QUEUE_SSE) - .setAddress(QUEUE_SSE) + QueueConfiguration(TOPIC_EVENTS) + .setAddress(TOPIC_EVENTS) .setRoutingType(RoutingType.MULTICAST), ) } diff --git a/komga/src/main/kotlin/org/gotson/komga/infrastructure/search/SearchIndexLifecycle.kt b/komga/src/main/kotlin/org/gotson/komga/infrastructure/search/SearchIndexLifecycle.kt index 0b377e711..d5d8bb12a 100644 --- a/komga/src/main/kotlin/org/gotson/komga/infrastructure/search/SearchIndexLifecycle.kt +++ b/komga/src/main/kotlin/org/gotson/komga/infrastructure/search/SearchIndexLifecycle.kt @@ -10,8 +10,7 @@ import org.gotson.komga.domain.model.SeriesCollection import org.gotson.komga.domain.model.SeriesSearchWithReadProgress import org.gotson.komga.domain.persistence.ReadListRepository import org.gotson.komga.domain.persistence.SeriesCollectionRepository -import org.gotson.komga.infrastructure.jms.QUEUE_SSE -import org.gotson.komga.infrastructure.jms.QUEUE_SSE_SELECTOR +import org.gotson.komga.infrastructure.jms.TOPIC_EVENTS import org.gotson.komga.infrastructure.jms.TOPIC_FACTORY import org.gotson.komga.interfaces.api.persistence.BookDtoRepository import org.gotson.komga.interfaces.api.persistence.SeriesDtoRepository @@ -79,7 +78,7 @@ class SearchIndexLifecycle( logger.info { "Lucene index version: ${luceneHelper.getIndexVersion()}" } } - @JmsListener(destination = QUEUE_SSE, selector = QUEUE_SSE_SELECTOR, containerFactory = TOPIC_FACTORY) + @JmsListener(destination = TOPIC_EVENTS, containerFactory = TOPIC_FACTORY) fun consumeEvents(event: DomainEvent) { when (event) { is DomainEvent.SeriesAdded -> seriesDtoRepository.findByIdOrNull(event.series.id, "unused")?.toDocument()?.let { addEntity(it) } diff --git a/komga/src/main/kotlin/org/gotson/komga/interfaces/sse/SseController.kt b/komga/src/main/kotlin/org/gotson/komga/interfaces/sse/SseController.kt index 4e0296b57..5a03b2ae1 100644 --- a/komga/src/main/kotlin/org/gotson/komga/interfaces/sse/SseController.kt +++ b/komga/src/main/kotlin/org/gotson/komga/interfaces/sse/SseController.kt @@ -4,10 +4,9 @@ import mu.KotlinLogging import org.gotson.komga.domain.model.DomainEvent import org.gotson.komga.domain.model.KomgaUser import org.gotson.komga.domain.persistence.BookRepository -import org.gotson.komga.infrastructure.jms.QUEUE_SSE -import org.gotson.komga.infrastructure.jms.QUEUE_SSE_SELECTOR -import org.gotson.komga.infrastructure.jms.QUEUE_SUB_TYPE +import org.gotson.komga.infrastructure.jms.JMS_PROPERTY_TYPE import org.gotson.komga.infrastructure.jms.QUEUE_TASKS +import org.gotson.komga.infrastructure.jms.TOPIC_EVENTS import org.gotson.komga.infrastructure.jms.TOPIC_FACTORY import org.gotson.komga.infrastructure.security.KomgaPrincipal import org.gotson.komga.infrastructure.web.toFilePath @@ -64,7 +63,7 @@ class SseController( if (emitters.isNotEmpty()) { val tasksCount = jmsTemplate.browse(QUEUE_TASKS) { _: Session, browser: QueueBrowser -> browser.enumeration.toList() - .groupingBy { (it as ObjectMessage).getStringProperty(QUEUE_SUB_TYPE) ?: "unknown" } + .groupingBy { (it as ObjectMessage).getStringProperty(JMS_PROPERTY_TYPE) ?: "unknown" } .eachCount() } ?: emptyMap() @@ -72,7 +71,7 @@ class SseController( } } - @JmsListener(destination = QUEUE_SSE, selector = QUEUE_SSE_SELECTOR, containerFactory = TOPIC_FACTORY) + @JmsListener(destination = TOPIC_EVENTS, containerFactory = TOPIC_FACTORY) fun handleSseEvent(event: DomainEvent) { when (event) { is DomainEvent.LibraryAdded -> emitSse("LibraryAdded", LibrarySseDto(event.library.id)) diff --git a/komga/src/test/kotlin/org/gotson/komga/infrastructure/jms/ArtemisConfigTest.kt b/komga/src/test/kotlin/org/gotson/komga/infrastructure/jms/ArtemisConfigTest.kt index 66bc1dac3..44f332072 100644 --- a/komga/src/test/kotlin/org/gotson/komga/infrastructure/jms/ArtemisConfigTest.kt +++ b/komga/src/test/kotlin/org/gotson/komga/infrastructure/jms/ArtemisConfigTest.kt @@ -1,7 +1,9 @@ package org.gotson.komga.infrastructure.jms +import com.ninjasquad.springmockk.MockkBean import mu.KotlinLogging import org.assertj.core.api.Assertions.assertThat +import org.gotson.komga.application.tasks.TaskHandler import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.ExtendWith @@ -21,6 +23,9 @@ class ArtemisConfigTest( @Autowired private val jmsTemplate: JmsTemplate, ) { + @MockkBean + private lateinit var taskHandler: TaskHandler // to avoid the taskHandler from picking up the messages + init { jmsTemplate.receiveTimeout = JmsDestinationAccessor.RECEIVE_TIMEOUT_NO_WAIT }