Skip to content

Commit

Permalink
Merge branch 'release/1.3.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
dgridnev committed May 16, 2024
2 parents dc6172f + 3c68949 commit 87c459e
Show file tree
Hide file tree
Showing 10 changed files with 224 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.wavesenterprise.we.tx.observer.core.spring.executor

import com.wavesenterprise.we.tx.observer.core.spring.properties.PartitionCleanerConfig
import com.wavesenterprise.we.tx.observer.jpa.repository.TxQueuePartitionJpaRepository
import net.javacrumbs.shedlock.spring.annotation.SchedulerLock
import org.slf4j.Logger
import org.slf4j.LoggerFactory

open class ScheduledPartitionCleaner(
val txQueuePartitionJpaRepository: TxQueuePartitionJpaRepository,
val partitionCleanerConfig: PartitionCleanerConfig,
) {
val logger: Logger = LoggerFactory.getLogger(ScheduledPartitionCleaner::class.java)

@SchedulerLock(
name = "cleanEmptyPartitions_task",
)
open fun cleanEmptyPartitions() {
logger.info("Cleaning empty partitions...")
var totalDeletedCount = 0
do {
val deletedCount = txQueuePartitionJpaRepository.deleteEmptyPartitions(
limit = partitionCleanerConfig.batchSize,
)
totalDeletedCount += deletedCount
} while (deletedCount > 0)
logger.info("Deleted $totalDeletedCount empty partitions")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.wavesenterprise.we.tx.observer.core.spring.properties

import java.time.Duration

interface PartitionCleanerConfig {
var enabled: Boolean
var fixedDelay: Duration
var batchSize: Int
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import org.springframework.data.jpa.repository.JpaSpecificationExecutor
import org.springframework.data.jpa.repository.Modifying
import org.springframework.data.jpa.repository.Query
import org.springframework.stereotype.Repository
import org.springframework.transaction.annotation.Transactional

@Repository
interface TxQueuePartitionJpaRepository :
Expand Down Expand Up @@ -135,6 +136,29 @@ interface TxQueuePartitionJpaRepository :
)
@Modifying
fun clearPausedOnTxIds(): Int

@Query(
"""
with empty_partitions_batch
as (
select tqp.id
from tx_observer.tx_queue_partition tqp
where not exists(
select *
from tx_observer.enqueued_tx etx
where etx.partition_id = tqp.id
)
limit :limit
)
delete
from tx_observer.tx_queue_partition tqp
using empty_partitions_batch where tqp.id = empty_partitions_batch.id;
""",
nativeQuery = true,
)
@Modifying
@Transactional
fun deleteEmptyPartitions(limit: Int): Int
}

const val STUCK_PARTITION_PRIORITY_THRESHOLD: Int = -100
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ import org.springframework.context.annotation.Import
TxObserverSchedulerConfig::class,
LockConfig::class,
PartitionPausedOnTxIdCleanerConfig::class,
PartitionCleanerConfig::class,
)
class JpaExecutorsConfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.wavesenterprise.we.tx.observer.starter

import com.wavesenterprise.we.tx.observer.core.spring.executor.ScheduledPartitionCleaner
import com.wavesenterprise.we.tx.observer.core.spring.properties.PartitionCleanerConfig
import com.wavesenterprise.we.tx.observer.jpa.config.TxObserverJpaConfig
import com.wavesenterprise.we.tx.observer.jpa.repository.TxQueuePartitionJpaRepository
import com.wavesenterprise.we.tx.observer.starter.properties.PartitionCleanerProperties
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Import

@Configuration
@Import(TxObserverJpaConfig::class)
@EnableConfigurationProperties(PartitionCleanerProperties::class)
class PartitionCleanerConfig {

@Bean
fun scheduledPartitionCleaner(
txQueuePartitionJpaRepository: TxQueuePartitionJpaRepository,
partitionCleanerConfig: PartitionCleanerConfig,
) =
ScheduledPartitionCleaner(
txQueuePartitionJpaRepository = txQueuePartitionJpaRepository,
partitionCleanerConfig = partitionCleanerConfig,
)
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.wavesenterprise.we.tx.observer.starter

import com.wavesenterprise.we.tx.observer.core.spring.executor.ScheduledForkResolver
import com.wavesenterprise.we.tx.observer.core.spring.executor.ScheduledPartitionCleaner
import com.wavesenterprise.we.tx.observer.core.spring.executor.ScheduledPartitionPausedOnTxIdCleaner
import com.wavesenterprise.we.tx.observer.core.spring.executor.ScheduledPartitionPoller
import com.wavesenterprise.we.tx.observer.core.spring.executor.ScheduledPrivacyChecker
Expand All @@ -11,6 +12,7 @@ import com.wavesenterprise.we.tx.observer.core.spring.executor.syncinfo.BlockHis
import com.wavesenterprise.we.tx.observer.core.spring.metrics.ScheduledMetricsCollector
import com.wavesenterprise.we.tx.observer.starter.properties.ForkResolverProperties
import com.wavesenterprise.we.tx.observer.starter.properties.MetricsCollectorProperties
import com.wavesenterprise.we.tx.observer.starter.properties.PartitionCleanerProperties
import com.wavesenterprise.we.tx.observer.starter.properties.PartitionPausedOnTxIdCleanerProperties
import com.wavesenterprise.we.tx.observer.starter.properties.PartitionPollerProperties
import com.wavesenterprise.we.tx.observer.starter.properties.PrivacyAvailabilityCheckProperties
Expand Down Expand Up @@ -42,6 +44,7 @@ import kotlin.reflect.jvm.javaMethod
ForkResolverConfig::class,
MetricsCollectorConfig::class,
PartitionPausedOnTxIdCleanerConfig::class,
PartitionCleanerConfig::class,
)
@EnableScheduling
@EnableConfigurationProperties(TxObserverSchedulerProperties::class)
Expand Down Expand Up @@ -82,6 +85,12 @@ class TxObserverSchedulerConfig {
@Autowired
lateinit var partitionPausedOnTxIdCleanerProperties: PartitionPausedOnTxIdCleanerProperties

@Autowired
lateinit var scheduledPartitionCleaner: ScheduledPartitionCleaner

@Autowired
lateinit var partitionCleanerProperties: PartitionCleanerProperties

@Autowired
lateinit var queueCleanerProperties: QueueCleanerProperties

Expand Down Expand Up @@ -118,6 +127,7 @@ class TxObserverSchedulerConfig {
addQueueCleaner()
addScheduledMetricsCollector()
addPartitionPausedOnTxIdCleaner()
addPartitionCleaner()
}

private fun ThreadPoolTaskScheduler.addScheduledMetricsCollector() {
Expand Down Expand Up @@ -226,6 +236,18 @@ class TxObserverSchedulerConfig {
}
}

private fun ThreadPoolTaskScheduler.addPartitionCleaner() {
if (partitionCleanerProperties.enabled) {
schedule(
scheduledMethodRunnable(
scheduledPartitionCleaner,
ScheduledPartitionCleaner::cleanEmptyPartitions
),
PeriodicTrigger(partitionCleanerProperties.fixedDelay.toMillis())
)
}
}

companion object {
private inline fun <reified T : Any> scheduledMethodRunnable(
scheduled: T,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.wavesenterprise.we.tx.observer.starter.properties

import com.wavesenterprise.we.tx.observer.core.spring.properties.PartitionCleanerConfig
import org.springframework.boot.context.properties.ConfigurationProperties
import org.springframework.boot.context.properties.ConstructorBinding
import org.springframework.boot.context.properties.bind.DefaultValue
import java.time.Duration

@ConfigurationProperties("tx-observer.partition-cleaner")
@ConstructorBinding
data class PartitionCleanerProperties(
@DefaultValue("true")
override var enabled: Boolean,
@DefaultValue("5m")
override var fixedDelay: Duration,
@DefaultValue("100")
override var batchSize: Int,
) : PartitionCleanerConfig
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,14 @@ import com.wavesenterprise.we.tx.observer.core.spring.properties.PartitionPoller
import org.springframework.boot.context.properties.ConfigurationProperties
import org.springframework.boot.context.properties.ConstructorBinding
import org.springframework.boot.context.properties.bind.DefaultValue
import org.springframework.boot.convert.DurationUnit
import java.time.Duration
import java.time.temporal.ChronoUnit

@ConfigurationProperties("tx-observer.partition-poller")
@ConstructorBinding
data class PartitionPollerProperties(
@DefaultValue("true")
override var enabled: Boolean,
@DurationUnit(ChronoUnit.MILLIS) @DefaultValue("500ms")
@DefaultValue("50ms")
override var fixedDelay: Duration,
@DefaultValue("4")
override var threadCount: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ data class TxObserverProperties(
@DefaultValue("true")
override var enabled: Boolean,
override var queueMode: String = "JPA",
@DurationUnit(ChronoUnit.MILLIS) @DefaultValue("200ms")
@DefaultValue("50ms")
override var fixedDelay: Duration,
@DataSizeUnit(DataUnit.MEGABYTES)
override var blockSizeWindow: DataSize = DataSize.ofMegabytes(10),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package com.wavesenterprise.we.tx.observer.starter.observer.executor

import com.wavesenterprise.sdk.node.client.http.tx.AtomicInnerTxDto.Companion.toDto
import com.wavesenterprise.sdk.node.test.data.TestDataFactory
import com.wavesenterprise.we.flyway.starter.FlywaySchemaConfiguration
import com.wavesenterprise.we.tx.observer.domain.TxQueuePartition
import com.wavesenterprise.we.tx.observer.jpa.TxObserverJpaAutoConfig
import com.wavesenterprise.we.tx.observer.jpa.config.TxObserverJpaConfig
import com.wavesenterprise.we.tx.observer.jpa.repository.EnqueuedTxJpaRepository
import com.wavesenterprise.we.tx.observer.jpa.repository.TxQueuePartitionJpaRepository
import com.wavesenterprise.we.tx.observer.starter.TxObserverStarterConfig
import com.wavesenterprise.we.tx.observer.starter.observer.config.NodeBlockingServiceFactoryMockConfiguration
import com.wavesenterprise.we.tx.observer.starter.observer.config.ObjectMapperConfig
import com.wavesenterprise.we.tx.observer.starter.observer.util.ModelFactory
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertFalse
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration
import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase
import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest
import org.springframework.test.context.ActiveProfiles
import org.springframework.test.context.ContextConfiguration
import javax.persistence.EntityManager
import javax.persistence.PersistenceContext

@DataJpaTest
@ActiveProfiles("test")
@ContextConfiguration(
classes = [
ObjectMapperConfig::class,
DataSourceAutoConfiguration::class,
NodeBlockingServiceFactoryMockConfiguration::class,
TxObserverJpaAutoConfig::class,
TxObserverStarterConfig::class,
FlywaySchemaConfiguration::class,
TxObserverJpaConfig::class,
]
)
@AutoConfigureTestDatabase(replace = AutoConfigureTestDatabase.Replace.NONE)
class ScheduledPartitionCleanerTest {

@Autowired
lateinit var txQueuePartitionJpaRepository: TxQueuePartitionJpaRepository

@Autowired
lateinit var enqueuedTxJpaRepository: EnqueuedTxJpaRepository

@PersistenceContext
lateinit var em: EntityManager

@Test
fun `should delete empty partitions`() {
val batchSize = 1
val activePartition =
TxQueuePartition(
id = "activePartitionId",
priority = 0,
)
val inactivePartition =
TxQueuePartition(
id = "inactivePartitionId",
priority = 0,
)
val tx1 = ModelFactory.enqueuedTx(
tx = TestDataFactory.callContractTx().toDto(),
partition = activePartition,
)
val tx2 = ModelFactory.enqueuedTx(
tx = TestDataFactory.callContractTx().toDto(),
partition = activePartition,
)
txQueuePartitionJpaRepository.saveAndFlush(activePartition)
txQueuePartitionJpaRepository.saveAndFlush(inactivePartition)
enqueuedTxJpaRepository.saveAndFlush(tx1)
enqueuedTxJpaRepository.saveAndFlush(tx2)
em.flushAndClear()

val deletedCount = txQueuePartitionJpaRepository.deleteEmptyPartitions(limit = batchSize)
em.flushAndClear()

assertEquals(1, deletedCount)
assertTrue(txQueuePartitionJpaRepository.existsById(activePartition.id))
assertFalse(txQueuePartitionJpaRepository.existsById(inactivePartition.id))
}

private fun EntityManager.flushAndClear() {
flush()
clear()
}
}

0 comments on commit 87c459e

Please sign in to comment.