-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDefaultChangeEventPublisher.kt
87 lines (79 loc) · 3.13 KB
/
DefaultChangeEventPublisher.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package com.aamdigital.aambackendservice.changes.queue
import com.aamdigital.aambackendservice.changes.core.ChangeEventPublisher
import com.aamdigital.aambackendservice.changes.domain.DatabaseChangeEvent
import com.aamdigital.aambackendservice.changes.domain.DocumentChangeEvent
import com.aamdigital.aambackendservice.error.AamErrorCode
import com.aamdigital.aambackendservice.error.AamException
import com.aamdigital.aambackendservice.error.InternalServerException
import com.aamdigital.aambackendservice.queue.core.QueueMessage
import com.fasterxml.jackson.databind.ObjectMapper
import org.slf4j.LoggerFactory
import org.springframework.amqp.AmqpException
import org.springframework.amqp.rabbit.core.RabbitTemplate
import java.time.Instant
import java.time.ZoneOffset
import java.time.format.DateTimeFormatter
import java.util.*
class DefaultChangeEventPublisher(
private val objectMapper: ObjectMapper,
private val rabbitTemplate: RabbitTemplate,
) : ChangeEventPublisher {
enum class DefaultChangeEventPublisherErrorCode : AamErrorCode {
EVENT_PUBLISH_ERROR
}
private val logger = LoggerFactory.getLogger(javaClass)
@Throws(AamException::class)
override fun publish(channel: String, event: DatabaseChangeEvent): QueueMessage {
val message = QueueMessage(
id = UUID.randomUUID(),
eventType = DatabaseChangeEvent::class.java.canonicalName,
event = event,
createdAt = Instant.now()
.atOffset(ZoneOffset.UTC)
.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)
)
try {
rabbitTemplate.convertAndSend(
channel,
objectMapper.writeValueAsString(message)
)
} catch (ex: AmqpException) {
throw InternalServerException(
message = "Could not publish DatabaseChangeEvent: $event",
code = DefaultChangeEventPublisherErrorCode.EVENT_PUBLISH_ERROR,
cause = ex
)
}
return message
}
@Throws(AamException::class)
override fun publish(exchange: String, event: DocumentChangeEvent): QueueMessage {
val message = QueueMessage(
id = UUID.randomUUID(),
eventType = DocumentChangeEvent::class.java.canonicalName,
event = event,
createdAt = Instant.now()
.atOffset(ZoneOffset.UTC)
.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)
)
try {
rabbitTemplate.convertAndSend(
exchange,
"",
objectMapper.writeValueAsString(message)
)
} catch (ex: AmqpException) {
throw InternalServerException(
message = "Could not publish DocumentChangeEvent: $event",
code = DefaultChangeEventPublisherErrorCode.EVENT_PUBLISH_ERROR,
cause = ex
)
}
logger.trace(
"[DefaultDocumentChangeEventPublisher]: publish message to channel '{}' Payload: {}",
exchange,
objectMapper.writeValueAsString(message)
)
return message
}
}