-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDefaultNotificationDocumentChangeConsumer.kt
73 lines (63 loc) · 3.02 KB
/
DefaultNotificationDocumentChangeConsumer.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package com.aamdigital.aambackendservice.notification.core
import com.aamdigital.aambackendservice.changes.domain.DocumentChangeEvent
import com.aamdigital.aambackendservice.error.AamException
import com.aamdigital.aambackendservice.notification.di.NotificationQueueConfiguration.Companion.DOCUMENT_CHANGES_NOTIFICATION_QUEUE
import com.aamdigital.aambackendservice.queue.core.QueueMessageParser
import com.rabbitmq.client.Channel
import org.slf4j.LoggerFactory
import org.springframework.amqp.AmqpRejectAndDontRequeueException
import org.springframework.amqp.core.Message
import org.springframework.amqp.rabbit.annotation.RabbitListener
class DefaultNotificationDocumentChangeConsumer(
private val messageParser: QueueMessageParser,
private val syncNotificationConfigUseCase: SyncNotificationConfigUseCase,
private val applyNotificationRulesUseCase: ApplyNotificationRulesUseCase,
) : NotificationDocumentChangeConsumer {
private val logger = LoggerFactory.getLogger(javaClass)
@RabbitListener(
queues = [DOCUMENT_CHANGES_NOTIFICATION_QUEUE],
// avoid concurrent processing so that we do not trigger multiple calculations for same data unnecessarily
concurrency = "1-1",
)
override fun consume(rawMessage: String, message: Message, channel: Channel) {
val type = try {
messageParser.getTypeKClass(rawMessage.toByteArray())
} catch (ex: AamException) {
throw AmqpRejectAndDontRequeueException("[${ex.code}] ${ex.localizedMessage}", ex)
}
when (type.qualifiedName) {
DocumentChangeEvent::class.qualifiedName -> {
val payload: DocumentChangeEvent = messageParser.getPayload(
body = rawMessage.toByteArray(),
kClass = DocumentChangeEvent::class
)
if (payload.documentId.startsWith("NotificationConfig:")) {
logger.trace(payload.toString())
syncNotificationConfigUseCase.run(
request = SyncNotificationConfigRequest(
notificationConfigDatabase = "app", // todo: configurable
notificationConfigId = payload.documentId,
notificationConfigRev = payload.rev,
)
)
return
}
applyNotificationRulesUseCase.run(
request = ApplyNotificationRulesRequest(
documentChangeEvent = payload
)
)
return
}
else -> {
logger.warn(
"[DefaultNotificationDocumentChangeConsumer] Could not find any use case for this EventType: {}",
type.qualifiedName,
)
throw AmqpRejectAndDontRequeueException(
"[NO_USECASE_CONFIGURED] Could not find matching use case for: ${type.qualifiedName}",
)
}
}
}
}