diff --git a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/executor/ScheduledPartitionCleaner.kt b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/executor/ScheduledPartitionCleaner.kt new file mode 100644 index 0000000..5921c63 --- /dev/null +++ b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/executor/ScheduledPartitionCleaner.kt @@ -0,0 +1,29 @@ +package com.wavesenterprise.we.tx.observer.core.spring.executor + +import com.wavesenterprise.we.tx.observer.core.spring.properties.PartitionCleanerConfig +import com.wavesenterprise.we.tx.observer.jpa.repository.TxQueuePartitionJpaRepository +import net.javacrumbs.shedlock.spring.annotation.SchedulerLock +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +open class ScheduledPartitionCleaner( + val txQueuePartitionJpaRepository: TxQueuePartitionJpaRepository, + val partitionCleanerConfig: PartitionCleanerConfig, +) { + val logger: Logger = LoggerFactory.getLogger(ScheduledPartitionCleaner::class.java) + + @SchedulerLock( + name = "cleanEmptyPartitions_task", + ) + open fun cleanEmptyPartitions() { + logger.info("Cleaning empty partitions...") + var totalDeletedCount = 0 + do { + val deletedCount = txQueuePartitionJpaRepository.deleteEmptyPartitions( + limit = partitionCleanerConfig.batchSize, + ) + totalDeletedCount += deletedCount + } while (deletedCount > 0) + logger.info("Deleted $totalDeletedCount empty partitions") + } +} diff --git a/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/properties/PartitionCleanerConfig.kt b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/properties/PartitionCleanerConfig.kt new file mode 100644 index 0000000..a83f4f0 --- /dev/null +++ b/we-tx-observer-module/we-tx-observer-core-spring/src/main/kotlin/com/wavesenterprise/we/tx/observer/core/spring/properties/PartitionCleanerConfig.kt @@ -0,0 +1,9 @@ +package com.wavesenterprise.we.tx.observer.core.spring.properties + +import java.time.Duration + +interface PartitionCleanerConfig { + var enabled: Boolean + var fixedDelay: Duration + var batchSize: Int +} diff --git a/we-tx-observer-module/we-tx-observer-jpa/src/main/kotlin/com/wavesenterprise/we/tx/observer/jpa/repository/TxQueuePartitionJpaRepository.kt b/we-tx-observer-module/we-tx-observer-jpa/src/main/kotlin/com/wavesenterprise/we/tx/observer/jpa/repository/TxQueuePartitionJpaRepository.kt index f8b5ead..3979f12 100644 --- a/we-tx-observer-module/we-tx-observer-jpa/src/main/kotlin/com/wavesenterprise/we/tx/observer/jpa/repository/TxQueuePartitionJpaRepository.kt +++ b/we-tx-observer-module/we-tx-observer-jpa/src/main/kotlin/com/wavesenterprise/we/tx/observer/jpa/repository/TxQueuePartitionJpaRepository.kt @@ -7,6 +7,7 @@ import org.springframework.data.jpa.repository.JpaSpecificationExecutor import org.springframework.data.jpa.repository.Modifying import org.springframework.data.jpa.repository.Query import org.springframework.stereotype.Repository +import org.springframework.transaction.annotation.Transactional @Repository interface TxQueuePartitionJpaRepository : @@ -135,6 +136,29 @@ interface TxQueuePartitionJpaRepository : ) @Modifying fun clearPausedOnTxIds(): Int + + @Query( + """ + with empty_partitions_batch + as ( + select tqp.id + from tx_observer.tx_queue_partition tqp + where not exists( + select * + from tx_observer.enqueued_tx etx + where etx.partition_id = tqp.id + ) + limit :limit + ) + delete + from tx_observer.tx_queue_partition tqp + using empty_partitions_batch where tqp.id = empty_partitions_batch.id; + """, + nativeQuery = true, + ) + @Modifying + @Transactional + fun deleteEmptyPartitions(limit: Int): Int } const val STUCK_PARTITION_PRIORITY_THRESHOLD: Int = -100 diff --git a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/JpaExecutorsConfig.kt b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/JpaExecutorsConfig.kt index 0f13d80..167e488 100644 --- a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/JpaExecutorsConfig.kt +++ b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/JpaExecutorsConfig.kt @@ -17,5 +17,6 @@ import org.springframework.context.annotation.Import TxObserverSchedulerConfig::class, LockConfig::class, PartitionPausedOnTxIdCleanerConfig::class, + PartitionCleanerConfig::class, ) class JpaExecutorsConfig diff --git a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/PartitionCleanerConfig.kt b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/PartitionCleanerConfig.kt new file mode 100644 index 0000000..a5cd0c5 --- /dev/null +++ b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/PartitionCleanerConfig.kt @@ -0,0 +1,27 @@ +package com.wavesenterprise.we.tx.observer.starter + +import com.wavesenterprise.we.tx.observer.core.spring.executor.ScheduledPartitionCleaner +import com.wavesenterprise.we.tx.observer.core.spring.properties.PartitionCleanerConfig +import com.wavesenterprise.we.tx.observer.jpa.config.TxObserverJpaConfig +import com.wavesenterprise.we.tx.observer.jpa.repository.TxQueuePartitionJpaRepository +import com.wavesenterprise.we.tx.observer.starter.properties.PartitionCleanerProperties +import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.context.annotation.Import + +@Configuration +@Import(TxObserverJpaConfig::class) +@EnableConfigurationProperties(PartitionCleanerProperties::class) +class PartitionCleanerConfig { + + @Bean + fun scheduledPartitionCleaner( + txQueuePartitionJpaRepository: TxQueuePartitionJpaRepository, + partitionCleanerConfig: PartitionCleanerConfig, + ) = + ScheduledPartitionCleaner( + txQueuePartitionJpaRepository = txQueuePartitionJpaRepository, + partitionCleanerConfig = partitionCleanerConfig, + ) +} diff --git a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/TxObserverSchedulerConfig.kt b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/TxObserverSchedulerConfig.kt index 3314720..977ca45 100644 --- a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/TxObserverSchedulerConfig.kt +++ b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/TxObserverSchedulerConfig.kt @@ -1,6 +1,7 @@ package com.wavesenterprise.we.tx.observer.starter import com.wavesenterprise.we.tx.observer.core.spring.executor.ScheduledForkResolver +import com.wavesenterprise.we.tx.observer.core.spring.executor.ScheduledPartitionCleaner import com.wavesenterprise.we.tx.observer.core.spring.executor.ScheduledPartitionPausedOnTxIdCleaner import com.wavesenterprise.we.tx.observer.core.spring.executor.ScheduledPartitionPoller import com.wavesenterprise.we.tx.observer.core.spring.executor.ScheduledPrivacyChecker @@ -11,6 +12,7 @@ import com.wavesenterprise.we.tx.observer.core.spring.executor.syncinfo.BlockHis import com.wavesenterprise.we.tx.observer.core.spring.metrics.ScheduledMetricsCollector import com.wavesenterprise.we.tx.observer.starter.properties.ForkResolverProperties import com.wavesenterprise.we.tx.observer.starter.properties.MetricsCollectorProperties +import com.wavesenterprise.we.tx.observer.starter.properties.PartitionCleanerProperties import com.wavesenterprise.we.tx.observer.starter.properties.PartitionPausedOnTxIdCleanerProperties import com.wavesenterprise.we.tx.observer.starter.properties.PartitionPollerProperties import com.wavesenterprise.we.tx.observer.starter.properties.PrivacyAvailabilityCheckProperties @@ -42,6 +44,7 @@ import kotlin.reflect.jvm.javaMethod ForkResolverConfig::class, MetricsCollectorConfig::class, PartitionPausedOnTxIdCleanerConfig::class, + PartitionCleanerConfig::class, ) @EnableScheduling @EnableConfigurationProperties(TxObserverSchedulerProperties::class) @@ -82,6 +85,12 @@ class TxObserverSchedulerConfig { @Autowired lateinit var partitionPausedOnTxIdCleanerProperties: PartitionPausedOnTxIdCleanerProperties + @Autowired + lateinit var scheduledPartitionCleaner: ScheduledPartitionCleaner + + @Autowired + lateinit var partitionCleanerProperties: PartitionCleanerProperties + @Autowired lateinit var queueCleanerProperties: QueueCleanerProperties @@ -118,6 +127,7 @@ class TxObserverSchedulerConfig { addQueueCleaner() addScheduledMetricsCollector() addPartitionPausedOnTxIdCleaner() + addPartitionCleaner() } private fun ThreadPoolTaskScheduler.addScheduledMetricsCollector() { @@ -226,6 +236,18 @@ class TxObserverSchedulerConfig { } } + private fun ThreadPoolTaskScheduler.addPartitionCleaner() { + if (partitionCleanerProperties.enabled) { + schedule( + scheduledMethodRunnable( + scheduledPartitionCleaner, + ScheduledPartitionCleaner::cleanEmptyPartitions + ), + PeriodicTrigger(partitionCleanerProperties.fixedDelay.toMillis()) + ) + } + } + companion object { private inline fun scheduledMethodRunnable( scheduled: T, diff --git a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/properties/PartitionCleanerProperties.kt b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/properties/PartitionCleanerProperties.kt new file mode 100644 index 0000000..02691ad --- /dev/null +++ b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/properties/PartitionCleanerProperties.kt @@ -0,0 +1,18 @@ +package com.wavesenterprise.we.tx.observer.starter.properties + +import com.wavesenterprise.we.tx.observer.core.spring.properties.PartitionCleanerConfig +import org.springframework.boot.context.properties.ConfigurationProperties +import org.springframework.boot.context.properties.ConstructorBinding +import org.springframework.boot.context.properties.bind.DefaultValue +import java.time.Duration + +@ConfigurationProperties("tx-observer.partition-cleaner") +@ConstructorBinding +data class PartitionCleanerProperties( + @DefaultValue("true") + override var enabled: Boolean, + @DefaultValue("5m") + override var fixedDelay: Duration, + @DefaultValue("100") + override var batchSize: Int, +) : PartitionCleanerConfig diff --git a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/properties/PartitionPollerProperties.kt b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/properties/PartitionPollerProperties.kt index 7cbffd1..086d054 100644 --- a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/properties/PartitionPollerProperties.kt +++ b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/properties/PartitionPollerProperties.kt @@ -4,16 +4,14 @@ import com.wavesenterprise.we.tx.observer.core.spring.properties.PartitionPoller import org.springframework.boot.context.properties.ConfigurationProperties import org.springframework.boot.context.properties.ConstructorBinding import org.springframework.boot.context.properties.bind.DefaultValue -import org.springframework.boot.convert.DurationUnit import java.time.Duration -import java.time.temporal.ChronoUnit @ConfigurationProperties("tx-observer.partition-poller") @ConstructorBinding data class PartitionPollerProperties( @DefaultValue("true") override var enabled: Boolean, - @DurationUnit(ChronoUnit.MILLIS) @DefaultValue("500ms") + @DefaultValue("50ms") override var fixedDelay: Duration, @DefaultValue("4") override var threadCount: Int, diff --git a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/properties/TxObserverProperties.kt b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/properties/TxObserverProperties.kt index 260540b..4ffde48 100644 --- a/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/properties/TxObserverProperties.kt +++ b/we-tx-observer-module/we-tx-observer-starter/src/main/kotlin/com/wavesenterprise/we/tx/observer/starter/properties/TxObserverProperties.kt @@ -17,7 +17,7 @@ data class TxObserverProperties( @DefaultValue("true") override var enabled: Boolean, override var queueMode: String = "JPA", - @DurationUnit(ChronoUnit.MILLIS) @DefaultValue("200ms") + @DefaultValue("50ms") override var fixedDelay: Duration, @DataSizeUnit(DataUnit.MEGABYTES) override var blockSizeWindow: DataSize = DataSize.ofMegabytes(10), diff --git a/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/ScheduledPartitionCleanerTest.kt b/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/ScheduledPartitionCleanerTest.kt new file mode 100644 index 0000000..1453c32 --- /dev/null +++ b/we-tx-observer-module/we-tx-observer-starter/src/test/kotlin/com/wavesenterprise/we/tx/observer/starter/observer/executor/ScheduledPartitionCleanerTest.kt @@ -0,0 +1,92 @@ +package com.wavesenterprise.we.tx.observer.starter.observer.executor + +import com.wavesenterprise.sdk.node.client.http.tx.AtomicInnerTxDto.Companion.toDto +import com.wavesenterprise.sdk.node.test.data.TestDataFactory +import com.wavesenterprise.we.flyway.starter.FlywaySchemaConfiguration +import com.wavesenterprise.we.tx.observer.domain.TxQueuePartition +import com.wavesenterprise.we.tx.observer.jpa.TxObserverJpaAutoConfig +import com.wavesenterprise.we.tx.observer.jpa.config.TxObserverJpaConfig +import com.wavesenterprise.we.tx.observer.jpa.repository.EnqueuedTxJpaRepository +import com.wavesenterprise.we.tx.observer.jpa.repository.TxQueuePartitionJpaRepository +import com.wavesenterprise.we.tx.observer.starter.TxObserverStarterConfig +import com.wavesenterprise.we.tx.observer.starter.observer.config.NodeBlockingServiceFactoryMockConfiguration +import com.wavesenterprise.we.tx.observer.starter.observer.config.ObjectMapperConfig +import com.wavesenterprise.we.tx.observer.starter.observer.util.ModelFactory +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration +import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase +import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest +import org.springframework.test.context.ActiveProfiles +import org.springframework.test.context.ContextConfiguration +import javax.persistence.EntityManager +import javax.persistence.PersistenceContext + +@DataJpaTest +@ActiveProfiles("test") +@ContextConfiguration( + classes = [ + ObjectMapperConfig::class, + DataSourceAutoConfiguration::class, + NodeBlockingServiceFactoryMockConfiguration::class, + TxObserverJpaAutoConfig::class, + TxObserverStarterConfig::class, + FlywaySchemaConfiguration::class, + TxObserverJpaConfig::class, + ] +) +@AutoConfigureTestDatabase(replace = AutoConfigureTestDatabase.Replace.NONE) +class ScheduledPartitionCleanerTest { + + @Autowired + lateinit var txQueuePartitionJpaRepository: TxQueuePartitionJpaRepository + + @Autowired + lateinit var enqueuedTxJpaRepository: EnqueuedTxJpaRepository + + @PersistenceContext + lateinit var em: EntityManager + + @Test + fun `should delete empty partitions`() { + val batchSize = 1 + val activePartition = + TxQueuePartition( + id = "activePartitionId", + priority = 0, + ) + val inactivePartition = + TxQueuePartition( + id = "inactivePartitionId", + priority = 0, + ) + val tx1 = ModelFactory.enqueuedTx( + tx = TestDataFactory.callContractTx().toDto(), + partition = activePartition, + ) + val tx2 = ModelFactory.enqueuedTx( + tx = TestDataFactory.callContractTx().toDto(), + partition = activePartition, + ) + txQueuePartitionJpaRepository.saveAndFlush(activePartition) + txQueuePartitionJpaRepository.saveAndFlush(inactivePartition) + enqueuedTxJpaRepository.saveAndFlush(tx1) + enqueuedTxJpaRepository.saveAndFlush(tx2) + em.flushAndClear() + + val deletedCount = txQueuePartitionJpaRepository.deleteEmptyPartitions(limit = batchSize) + em.flushAndClear() + + assertEquals(1, deletedCount) + assertTrue(txQueuePartitionJpaRepository.existsById(activePartition.id)) + assertFalse(txQueuePartitionJpaRepository.existsById(inactivePartition.id)) + } + + private fun EntityManager.flushAndClear() { + flush() + clear() + } +}