diff --git a/build.gradle.kts b/build.gradle.kts index 8f696146..1c75e3d5 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -24,7 +24,7 @@ apply(plugin = "io.github.gradle-nexus.publish-plugin") allprojects { group = "com.github.jasync-sql" - version = "2.1.20" + version = "2.1.23" apply(plugin = "kotlin") apply(plugin = "maven-publish") diff --git a/r2dbc-mysql/src/main/java/JasyncClientConnection.kt b/r2dbc-mysql/src/main/java/JasyncClientConnection.kt index c455a018..112a2342 100644 --- a/r2dbc-mysql/src/main/java/JasyncClientConnection.kt +++ b/r2dbc-mysql/src/main/java/JasyncClientConnection.kt @@ -2,7 +2,6 @@ package com.github.jasync.r2dbc.mysql import com.github.jasync.sql.db.mysql.MySQLConnection import com.github.jasync.sql.db.mysql.pool.MySQLConnectionFactory -import com.github.jasync.sql.db.util.flatMap import com.github.jasync.sql.db.util.map import io.r2dbc.spi.Batch import io.r2dbc.spi.Connection @@ -44,18 +43,21 @@ class JasyncClientConnection( override fun beginTransaction(definition: TransactionDefinition): Publisher { return Mono.defer { - var future = jasyncConnection.sendQuery("START TRANSACTION") - definition.getAttribute(TransactionDefinition.ISOLATION_LEVEL)?.let { isolationLevel -> - future = - future.flatMap { jasyncConnection.sendQuery("SET TRANSACTION ISOLATION LEVEL " + isolationLevel.asSql()) } - .map { this.isolationLevel = isolationLevel; it } - } - definition.getAttribute(TransactionDefinition.LOCK_WAIT_TIMEOUT)?.let { timeout -> - future = - future.flatMap { jasyncConnection.sendQuery("SET innodb_lock_wait_timeout=${timeout.seconds}") } - } - future = future.flatMap { jasyncConnection.sendQuery("SET AUTOCOMMIT = 0") } - future.toMono().then() + val setAutoCommit = Mono.from(setAutoCommit(false)) + + val setLockWaitTimeout = Mono.justOrEmpty(definition.getAttribute(TransactionDefinition.LOCK_WAIT_TIMEOUT)) + .flatMap { timeout -> Mono.from(setLockWaitTimeout(timeout)) } + + val changeIsolationLevel = Mono.justOrEmpty(definition.getAttribute(TransactionDefinition.ISOLATION_LEVEL)) + .flatMap { newIsolationLevel -> Mono.from(setTransactionIsolationLevel(newIsolationLevel)) } + + val startTransaction = Mono.from(beginTransaction()) + + return@defer Mono.from(setAutoCommit) + .then(setLockWaitTimeout) + .then(changeIsolationLevel) + .then(startTransaction) + .then() } } diff --git a/r2dbc-mysql/src/test/java/com/github/jasync/r2dbc/mysql/integ/R2dbcTransactionIntegrationTest.kt b/r2dbc-mysql/src/test/java/com/github/jasync/r2dbc/mysql/integ/R2dbcTransactionIntegrationTest.kt new file mode 100644 index 00000000..62c4d81f --- /dev/null +++ b/r2dbc-mysql/src/test/java/com/github/jasync/r2dbc/mysql/integ/R2dbcTransactionIntegrationTest.kt @@ -0,0 +1,185 @@ +package com.github.jasync.r2dbc.mysql.integ + +import com.github.jasync.r2dbc.mysql.JasyncConnectionFactory +import com.github.jasync.sql.db.mysql.MySQLConnection +import com.github.jasync.sql.db.mysql.pool.MySQLConnectionFactory +import com.github.jasync.sql.db.util.FP +import com.github.jasync.sql.db.util.isCompleted +import io.mockk.mockk +import io.r2dbc.pool.ConnectionPoolConfiguration +import io.r2dbc.spi.IsolationLevel +import io.r2dbc.spi.Option +import io.r2dbc.spi.TransactionDefinition +import org.awaitility.kotlin.await +import org.junit.Test +import org.springframework.r2dbc.connection.R2dbcTransactionManager +import org.springframework.r2dbc.connection.TransactionAwareConnectionFactoryProxy +import org.springframework.transaction.reactive.TransactionalOperator +import org.springframework.transaction.support.DefaultTransactionDefinition +import reactor.core.publisher.Mono +import java.time.Duration +import java.util.concurrent.CompletableFuture +import kotlin.test.assertEquals + +class R2dbcTransactionIntegrationTest : R2dbcConnectionHelper() { + + @Test + fun `verify read committed transaction isolation level`() { + withConnection { c -> + executeQuery(c, createTableNumericColumns) + executeQuery(c, insertTableNumericColumns) + val mycf = object : MySQLConnectionFactory(mockk()) { + override fun create(): CompletableFuture { + return FP.successful(c) + } + } + val cf = JasyncConnectionFactory(mycf) + + val result = CompletableFuture() + + Mono.from(cf.create()) + .flatMap { connection -> + Mono.from(connection.beginTransaction(ExtendedTransactionDefinition(isolationLevel = IsolationLevel.READ_COMMITTED))) + .then( + Mono.from( + connection + .createStatement("SELECT COUNT(*) FROM numbers") + .execute() + ) + ) + } + .flatMap { Mono.from(it.map { row, _ -> row.get("COUNT(*)") as Long }) } + .subscribe( + { countResult -> result.complete(countResult) }, + { throwable -> result.completeExceptionally(throwable) } + ) + + await.untilAsserted { + assert(result.isCompleted) + assertEquals(1, result.get()) + } + } + } + + @Test + fun `verify read commited transaction isolation level (spring tx manager)`() { + val timeout = 3L + val timeoutConfiguration = getConfiguration().copy(queryTimeout = Duration.ofSeconds(timeout)) + + withConfigurableConnection(timeoutConfiguration) { c -> + + executeQuery(c, createTableNumericColumns) + executeQuery(c, insertTableNumericColumns) + + val mycf = object : MySQLConnectionFactory(mockk()) { + override fun create(): CompletableFuture { + return FP.successful(c) + } + } + val cf = JasyncConnectionFactory(mycf) + val r2dbcPoolConfig = ConnectionPoolConfiguration.builder() + .initialSize(5) + .minIdle(5) + .connectionFactory(cf) + .build() + + val r2dbcPool = io.r2dbc.pool.ConnectionPool(r2dbcPoolConfig) + val tm = R2dbcTransactionManager(r2dbcPool) + val transactionalOperator = TransactionalOperator.create( + tm, + DefaultTransactionDefinition().apply { + isolationLevel = org.springframework.transaction.TransactionDefinition.ISOLATION_READ_COMMITTED + } + ) + + val tcf = TransactionAwareConnectionFactoryProxy(r2dbcPool) + + val result = CompletableFuture() + + Mono.from(tcf.create()) + .flatMap { connection -> + Mono.from( + connection + .createStatement("SELECT COUNT(*) FROM numbers") + .execute() + ) + } + .flatMap { Mono.from(it.map { row, _ -> row.get("COUNT(*)") as Long }) } + .`as`(transactionalOperator::transactional) + .subscribe( + { countResult -> result.complete(countResult) }, + { throwable -> result.completeExceptionally(throwable) } + ) + + await.untilAsserted { + assert(result.isCompleted) + assertEquals(1, result.get()) + } + } + } + + @Test + fun `verify default transaction isolation level`() { + withConnection { c -> + executeQuery(c, createTableNumericColumns) + executeQuery(c, insertTableNumericColumns) + val mycf = object : MySQLConnectionFactory(mockk()) { + override fun create(): CompletableFuture { + return FP.successful(c) + } + } + val cf = JasyncConnectionFactory(mycf) + + val result = CompletableFuture() + + Mono.from(cf.create()) + .flatMap { connection -> + Mono.from(connection.beginTransaction(ExtendedTransactionDefinition(isolationLevel = null))) + .then( + Mono.from( + connection + .createStatement("SELECT COUNT(*) FROM numbers") + .execute() + ) + ) + } + .flatMap { Mono.from(it.map { row, _ -> row.get("COUNT(*)") as Long }) } + .subscribe( + { countResult -> result.complete(countResult) }, + { throwable -> result.completeExceptionally(throwable) } + ) + + await.untilAsserted { + assert(result.isCompleted) + assertEquals(1, result.get()) + } + } + } + + private class ExtendedTransactionDefinition constructor( + private val transactionName: String? = null, + private val readOnly: Boolean = false, + private val isolationLevel: IsolationLevel? = null, + private val lockWaitTimeout: Duration = Duration.ofMillis(0) + ) : TransactionDefinition { + + override fun getAttribute(option: Option): T { + return doGetValue(option) as T + } + + private fun doGetValue(option: Option<*>): Any? { + if (TransactionDefinition.ISOLATION_LEVEL == option) { + return this.isolationLevel + } + if (TransactionDefinition.NAME == option) { + return this.transactionName + } + if (TransactionDefinition.READ_ONLY == option) { + return this.readOnly + } + return if (TransactionDefinition.LOCK_WAIT_TIMEOUT == option && !this.lockWaitTimeout.isZero) { + this.lockWaitTimeout + } else null + } + } +}