refactor: simplify JMS configuration

This commit is contained in:
Gauthier Roebroeck 2022-02-09 18:23:26 +08:00
parent 745fe09ec2
commit a8340e816b
7 changed files with 22 additions and 29 deletions

View file

@ -1,9 +1,8 @@
package org.gotson.komga.application.events
import org.gotson.komga.domain.model.DomainEvent
import org.gotson.komga.infrastructure.jms.QUEUE_SSE
import org.gotson.komga.infrastructure.jms.QUEUE_SSE_TYPE
import org.gotson.komga.infrastructure.jms.QUEUE_TYPE
import org.gotson.komga.infrastructure.jms.JMS_PROPERTY_TYPE
import org.gotson.komga.infrastructure.jms.TOPIC_EVENTS
import org.springframework.jms.core.JmsTemplate
import org.springframework.stereotype.Service
import javax.jms.ConnectionFactory
@ -17,9 +16,9 @@ class EventPublisher(
}
fun publishEvent(event: DomainEvent) {
jmsTemplate.convertAndSend(QUEUE_SSE, event) {
jmsTemplate.convertAndSend(TOPIC_EVENTS, event) {
it.apply {
setStringProperty(QUEUE_TYPE, QUEUE_SSE_TYPE)
setStringProperty(JMS_PROPERTY_TYPE, event.javaClass.simpleName)
}
}
}

View file

@ -15,7 +15,6 @@ import org.gotson.komga.domain.service.SeriesLifecycle
import org.gotson.komga.domain.service.SeriesMetadataLifecycle
import org.gotson.komga.infrastructure.jms.QUEUE_FACTORY
import org.gotson.komga.infrastructure.jms.QUEUE_TASKS
import org.gotson.komga.infrastructure.jms.QUEUE_TASKS_SELECTOR
import org.gotson.komga.infrastructure.search.SearchIndexLifecycle
import org.springframework.jms.annotation.JmsListener
import org.springframework.stereotype.Service
@ -42,7 +41,7 @@ class TaskHandler(
private val searchIndexLifecycle: SearchIndexLifecycle,
) {
@JmsListener(destination = QUEUE_TASKS, selector = QUEUE_TASKS_SELECTOR, containerFactory = QUEUE_FACTORY, concurrency = "#{@komgaProperties.taskConsumers}-#{@komgaProperties.taskConsumersMax}")
@JmsListener(destination = QUEUE_TASKS, containerFactory = QUEUE_FACTORY, concurrency = "#{@komgaProperties.taskConsumers}-#{@komgaProperties.taskConsumersMax}")
fun handleTask(task: Task) {
logger.info { "Executing task: $task" }
try {

View file

@ -12,10 +12,8 @@ import org.gotson.komga.domain.persistence.BookRepository
import org.gotson.komga.domain.persistence.LibraryRepository
import org.gotson.komga.domain.service.BookConverter
import org.gotson.komga.domain.service.PageHashLifecycle
import org.gotson.komga.infrastructure.jms.QUEUE_SUB_TYPE
import org.gotson.komga.infrastructure.jms.JMS_PROPERTY_TYPE
import org.gotson.komga.infrastructure.jms.QUEUE_TASKS
import org.gotson.komga.infrastructure.jms.QUEUE_TASKS_TYPE
import org.gotson.komga.infrastructure.jms.QUEUE_TYPE
import org.gotson.komga.infrastructure.jms.QUEUE_UNIQUE_ID
import org.gotson.komga.infrastructure.jooq.UnpagedSorted
import org.gotson.komga.infrastructure.search.LuceneEntity
@ -156,9 +154,8 @@ class TaskReceiver(
logger.info { "Sending task: $task" }
jmsTemplates[task.priority]!!.convertAndSend(QUEUE_TASKS, task) {
it.apply {
setStringProperty(QUEUE_TYPE, QUEUE_TASKS_TYPE)
setStringProperty(QUEUE_UNIQUE_ID, task.uniqueId())
setStringProperty(QUEUE_SUB_TYPE, task::class.simpleName)
setStringProperty(JMS_PROPERTY_TYPE, task.javaClass.simpleName)
task.groupId?.let { groupId -> setStringProperty("JMSXGroupID", groupId) }
}
}

View file

@ -15,16 +15,11 @@ import org.apache.activemq.artemis.core.config.Configuration as ArtemisConfigura
private val logger = KotlinLogging.logger {}
const val QUEUE_UNIQUE_ID = "unique_id"
const val QUEUE_TYPE = "type"
const val QUEUE_SUB_TYPE = "subtype"
const val QUEUE_TASKS = "tasks.background"
const val QUEUE_TASKS_TYPE = "task"
const val QUEUE_TASKS_SELECTOR = "$QUEUE_TYPE = '$QUEUE_TASKS_TYPE'"
const val TOPIC_EVENTS = "domain.events"
const val QUEUE_SSE = "sse"
const val QUEUE_SSE_TYPE = "sse"
const val QUEUE_SSE_SELECTOR = "$QUEUE_TYPE = '$QUEUE_SSE_TYPE'"
const val JMS_PROPERTY_TYPE = "type"
const val TOPIC_FACTORY = "topicJmsListenerContainerFactory"
const val QUEUE_FACTORY = "queueJmsListenerContainerFactory"
@ -49,8 +44,8 @@ class ArtemisConfig : ArtemisConfigurationCustomizer {
.setRoutingType(RoutingType.ANYCAST),
)
it.addQueueConfiguration(
QueueConfiguration(QUEUE_SSE)
.setAddress(QUEUE_SSE)
QueueConfiguration(TOPIC_EVENTS)
.setAddress(TOPIC_EVENTS)
.setRoutingType(RoutingType.MULTICAST),
)
}

View file

@ -10,8 +10,7 @@ import org.gotson.komga.domain.model.SeriesCollection
import org.gotson.komga.domain.model.SeriesSearchWithReadProgress
import org.gotson.komga.domain.persistence.ReadListRepository
import org.gotson.komga.domain.persistence.SeriesCollectionRepository
import org.gotson.komga.infrastructure.jms.QUEUE_SSE
import org.gotson.komga.infrastructure.jms.QUEUE_SSE_SELECTOR
import org.gotson.komga.infrastructure.jms.TOPIC_EVENTS
import org.gotson.komga.infrastructure.jms.TOPIC_FACTORY
import org.gotson.komga.interfaces.api.persistence.BookDtoRepository
import org.gotson.komga.interfaces.api.persistence.SeriesDtoRepository
@ -79,7 +78,7 @@ class SearchIndexLifecycle(
logger.info { "Lucene index version: ${luceneHelper.getIndexVersion()}" }
}
@JmsListener(destination = QUEUE_SSE, selector = QUEUE_SSE_SELECTOR, containerFactory = TOPIC_FACTORY)
@JmsListener(destination = TOPIC_EVENTS, containerFactory = TOPIC_FACTORY)
fun consumeEvents(event: DomainEvent) {
when (event) {
is DomainEvent.SeriesAdded -> seriesDtoRepository.findByIdOrNull(event.series.id, "unused")?.toDocument()?.let { addEntity(it) }

View file

@ -4,10 +4,9 @@ import mu.KotlinLogging
import org.gotson.komga.domain.model.DomainEvent
import org.gotson.komga.domain.model.KomgaUser
import org.gotson.komga.domain.persistence.BookRepository
import org.gotson.komga.infrastructure.jms.QUEUE_SSE
import org.gotson.komga.infrastructure.jms.QUEUE_SSE_SELECTOR
import org.gotson.komga.infrastructure.jms.QUEUE_SUB_TYPE
import org.gotson.komga.infrastructure.jms.JMS_PROPERTY_TYPE
import org.gotson.komga.infrastructure.jms.QUEUE_TASKS
import org.gotson.komga.infrastructure.jms.TOPIC_EVENTS
import org.gotson.komga.infrastructure.jms.TOPIC_FACTORY
import org.gotson.komga.infrastructure.security.KomgaPrincipal
import org.gotson.komga.infrastructure.web.toFilePath
@ -64,7 +63,7 @@ class SseController(
if (emitters.isNotEmpty()) {
val tasksCount = jmsTemplate.browse(QUEUE_TASKS) { _: Session, browser: QueueBrowser ->
browser.enumeration.toList()
.groupingBy { (it as ObjectMessage).getStringProperty(QUEUE_SUB_TYPE) ?: "unknown" }
.groupingBy { (it as ObjectMessage).getStringProperty(JMS_PROPERTY_TYPE) ?: "unknown" }
.eachCount()
} ?: emptyMap()
@ -72,7 +71,7 @@ class SseController(
}
}
@JmsListener(destination = QUEUE_SSE, selector = QUEUE_SSE_SELECTOR, containerFactory = TOPIC_FACTORY)
@JmsListener(destination = TOPIC_EVENTS, containerFactory = TOPIC_FACTORY)
fun handleSseEvent(event: DomainEvent) {
when (event) {
is DomainEvent.LibraryAdded -> emitSse("LibraryAdded", LibrarySseDto(event.library.id))

View file

@ -1,7 +1,9 @@
package org.gotson.komga.infrastructure.jms
import com.ninjasquad.springmockk.MockkBean
import mu.KotlinLogging
import org.assertj.core.api.Assertions.assertThat
import org.gotson.komga.application.tasks.TaskHandler
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
@ -21,6 +23,9 @@ class ArtemisConfigTest(
@Autowired private val jmsTemplate: JmsTemplate,
) {
@MockkBean
private lateinit var taskHandler: TaskHandler // to avoid the taskHandler from picking up the messages
init {
jmsTemplate.receiveTimeout = JmsDestinationAccessor.RECEIVE_TIMEOUT_NO_WAIT
}