Skip to content

Commit c9cffe1

Browse files
Nikita Turchinoshai
authored andcommitted
add tx isolation level test
1 parent 37be414 commit c9cffe1

File tree

2 files changed

+200
-13
lines changed

2 files changed

+200
-13
lines changed

r2dbc-mysql/src/main/java/JasyncClientConnection.kt

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package com.github.jasync.r2dbc.mysql
22

33
import com.github.jasync.sql.db.mysql.MySQLConnection
44
import com.github.jasync.sql.db.mysql.pool.MySQLConnectionFactory
5-
import com.github.jasync.sql.db.util.flatMap
65
import com.github.jasync.sql.db.util.map
76
import io.r2dbc.spi.Batch
87
import io.r2dbc.spi.Connection
@@ -44,18 +43,21 @@ class JasyncClientConnection(
4443

4544
override fun beginTransaction(definition: TransactionDefinition): Publisher<Void> {
4645
return Mono.defer {
47-
var future = jasyncConnection.sendQuery("START TRANSACTION")
48-
definition.getAttribute(TransactionDefinition.ISOLATION_LEVEL)?.let { isolationLevel ->
49-
future =
50-
future.flatMap { jasyncConnection.sendQuery("SET TRANSACTION ISOLATION LEVEL " + isolationLevel.asSql()) }
51-
.map { this.isolationLevel = isolationLevel; it }
52-
}
53-
definition.getAttribute(TransactionDefinition.LOCK_WAIT_TIMEOUT)?.let { timeout ->
54-
future =
55-
future.flatMap { jasyncConnection.sendQuery("SET innodb_lock_wait_timeout=${timeout.seconds}") }
56-
}
57-
future = future.flatMap { jasyncConnection.sendQuery("SET AUTOCOMMIT = 0") }
58-
future.toMono().then()
46+
val setAutoCommit = Mono.from(setAutoCommit(false))
47+
48+
val setLockWaitTimeout = Mono.justOrEmpty(definition.getAttribute(TransactionDefinition.LOCK_WAIT_TIMEOUT))
49+
.flatMap { timeout -> Mono.from(setLockWaitTimeout(timeout)) }
50+
51+
val changeIsolationLevel = Mono.justOrEmpty(definition.getAttribute(TransactionDefinition.ISOLATION_LEVEL))
52+
.flatMap { newIsolationLevel -> Mono.from(setTransactionIsolationLevel(newIsolationLevel)) }
53+
54+
val startTransaction = Mono.from(beginTransaction())
55+
56+
return@defer Mono.from(setAutoCommit)
57+
.then(setLockWaitTimeout)
58+
.then(changeIsolationLevel)
59+
.then(startTransaction)
60+
.then()
5961
}
6062
}
6163

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
package com.github.jasync.r2dbc.mysql.integ
2+
3+
import com.github.jasync.r2dbc.mysql.JasyncConnectionFactory
4+
import com.github.jasync.sql.db.mysql.MySQLConnection
5+
import com.github.jasync.sql.db.mysql.pool.MySQLConnectionFactory
6+
import com.github.jasync.sql.db.util.FP
7+
import com.github.jasync.sql.db.util.isCompleted
8+
import io.mockk.mockk
9+
import io.r2dbc.pool.ConnectionPoolConfiguration
10+
import io.r2dbc.spi.IsolationLevel
11+
import io.r2dbc.spi.Option
12+
import io.r2dbc.spi.TransactionDefinition
13+
import org.awaitility.kotlin.await
14+
import org.junit.Test
15+
import org.springframework.r2dbc.connection.R2dbcTransactionManager
16+
import org.springframework.r2dbc.connection.TransactionAwareConnectionFactoryProxy
17+
import org.springframework.transaction.reactive.TransactionalOperator
18+
import org.springframework.transaction.support.DefaultTransactionDefinition
19+
import reactor.core.publisher.Mono
20+
import java.time.Duration
21+
import java.util.concurrent.CompletableFuture
22+
import kotlin.test.assertEquals
23+
24+
class R2dbcTransactionIntegrationTest : R2dbcConnectionHelper() {
25+
26+
@Test
27+
fun `verify read committed transaction isolation level`() {
28+
withConnection { c ->
29+
executeQuery(c, createTableNumericColumns)
30+
executeQuery(c, insertTableNumericColumns)
31+
val mycf = object : MySQLConnectionFactory(mockk()) {
32+
override fun create(): CompletableFuture<MySQLConnection> {
33+
return FP.successful(c)
34+
}
35+
}
36+
val cf = JasyncConnectionFactory(mycf)
37+
38+
val result = CompletableFuture<Long>()
39+
40+
Mono.from(cf.create())
41+
.flatMap { connection ->
42+
Mono.from(connection.beginTransaction(ExtendedTransactionDefinition(isolationLevel = IsolationLevel.READ_COMMITTED)))
43+
.then(
44+
Mono.from(
45+
connection
46+
.createStatement("SELECT COUNT(*) FROM numbers")
47+
.execute()
48+
)
49+
)
50+
}
51+
.flatMap { Mono.from(it.map { row, _ -> row.get("COUNT(*)") as Long }) }
52+
.subscribe(
53+
{ countResult -> result.complete(countResult) },
54+
{ throwable -> result.completeExceptionally(throwable) }
55+
)
56+
57+
await.untilAsserted {
58+
assert(result.isCompleted)
59+
assertEquals(1, result.get())
60+
}
61+
}
62+
}
63+
64+
@Test
65+
fun `verify read commited transaction isolation level (spring tx manager)`() {
66+
val timeout = 3L
67+
val timeoutConfiguration = getConfiguration().copy(queryTimeout = Duration.ofSeconds(timeout))
68+
69+
withConfigurableConnection(timeoutConfiguration) { c ->
70+
71+
executeQuery(c, createTableNumericColumns)
72+
executeQuery(c, insertTableNumericColumns)
73+
74+
val mycf = object : MySQLConnectionFactory(mockk()) {
75+
override fun create(): CompletableFuture<MySQLConnection> {
76+
return FP.successful(c)
77+
}
78+
}
79+
val cf = JasyncConnectionFactory(mycf)
80+
val r2dbcPoolConfig = ConnectionPoolConfiguration.builder()
81+
.initialSize(5)
82+
.minIdle(5)
83+
.connectionFactory(cf)
84+
.build()
85+
86+
val r2dbcPool = io.r2dbc.pool.ConnectionPool(r2dbcPoolConfig)
87+
val tm = R2dbcTransactionManager(r2dbcPool)
88+
val transactionalOperator = TransactionalOperator.create(
89+
tm,
90+
DefaultTransactionDefinition().apply {
91+
isolationLevel = org.springframework.transaction.TransactionDefinition.ISOLATION_READ_COMMITTED
92+
}
93+
)
94+
95+
val tcf = TransactionAwareConnectionFactoryProxy(r2dbcPool)
96+
97+
val result = CompletableFuture<Long>()
98+
99+
Mono.from(tcf.create())
100+
.flatMap { connection ->
101+
Mono.from(
102+
connection
103+
.createStatement("SELECT COUNT(*) FROM numbers")
104+
.execute()
105+
)
106+
}
107+
.flatMap { Mono.from(it.map { row, _ -> row.get("COUNT(*)") as Long }) }
108+
.`as`(transactionalOperator::transactional)
109+
.subscribe(
110+
{ countResult -> result.complete(countResult) },
111+
{ throwable -> result.completeExceptionally(throwable) }
112+
)
113+
114+
await.untilAsserted {
115+
assert(result.isCompleted)
116+
assertEquals(1, result.get())
117+
}
118+
}
119+
}
120+
121+
@Test
122+
fun `verify default transaction isolation level`() {
123+
withConnection { c ->
124+
executeQuery(c, createTableNumericColumns)
125+
executeQuery(c, insertTableNumericColumns)
126+
val mycf = object : MySQLConnectionFactory(mockk()) {
127+
override fun create(): CompletableFuture<MySQLConnection> {
128+
return FP.successful(c)
129+
}
130+
}
131+
val cf = JasyncConnectionFactory(mycf)
132+
133+
val result = CompletableFuture<Long>()
134+
135+
Mono.from(cf.create())
136+
.flatMap { connection ->
137+
Mono.from(connection.beginTransaction(ExtendedTransactionDefinition(isolationLevel = null)))
138+
.then(
139+
Mono.from(
140+
connection
141+
.createStatement("SELECT COUNT(*) FROM numbers")
142+
.execute()
143+
)
144+
)
145+
}
146+
.flatMap { Mono.from(it.map { row, _ -> row.get("COUNT(*)") as Long }) }
147+
.subscribe(
148+
{ countResult -> result.complete(countResult) },
149+
{ throwable -> result.completeExceptionally(throwable) }
150+
)
151+
152+
await.untilAsserted {
153+
assert(result.isCompleted)
154+
assertEquals(1, result.get())
155+
}
156+
}
157+
}
158+
159+
private class ExtendedTransactionDefinition constructor(
160+
private val transactionName: String? = null,
161+
private val readOnly: Boolean = false,
162+
private val isolationLevel: IsolationLevel? = null,
163+
private val lockWaitTimeout: Duration = Duration.ofMillis(0)
164+
) : TransactionDefinition {
165+
166+
override fun <T> getAttribute(option: Option<T>): T {
167+
return doGetValue(option) as T
168+
}
169+
170+
private fun doGetValue(option: Option<*>): Any? {
171+
if (TransactionDefinition.ISOLATION_LEVEL == option) {
172+
return this.isolationLevel
173+
}
174+
if (TransactionDefinition.NAME == option) {
175+
return this.transactionName
176+
}
177+
if (TransactionDefinition.READ_ONLY == option) {
178+
return this.readOnly
179+
}
180+
return if (TransactionDefinition.LOCK_WAIT_TIMEOUT == option && !this.lockWaitTimeout.isZero) {
181+
this.lockWaitTimeout
182+
} else null
183+
}
184+
}
185+
}

0 commit comments

Comments
 (0)