Skip to content

Commit fa1aadc

Browse files
authored
impl for r2dbc filter/flatmap (#356)
* impl for r2dbc filter/flatmap on operation create a new instance from result `JasyncSegmentResult` JasyncSegmentResult has a flux of segments which are rows + number of rows segment. filter and flatmap are operating on that flux. There is no error handling here as the original result don't have such concept. JasyncStatement.execute() either yields result or exception (exception is not a result). see #351 * add unit test for filter
1 parent 2fdfaab commit fa1aadc

File tree

3 files changed

+88
-7
lines changed

3 files changed

+88
-7
lines changed

r2dbc-mysql/src/main/java/JasyncResult.kt

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package com.github.jasync.r2dbc.mysql
22

33
import com.github.jasync.sql.db.ResultSet
44
import io.r2dbc.spi.Result
5+
import io.r2dbc.spi.Result.RowSegment
6+
import io.r2dbc.spi.Row
57
import io.r2dbc.spi.RowMetadata
68
import org.reactivestreams.Publisher
79
import reactor.core.publisher.Flux
@@ -34,18 +36,60 @@ class JasyncResult(
3436

3537
override fun <T> map(mappingFunction: BiFunction<io.r2dbc.spi.Row, RowMetadata, out T>): Publisher<T> {
3638
return if (selectLastInsertId) {
37-
Mono.fromSupplier { mappingFunction.apply(JasyncInsertSyntheticRow(generatedKeyName, lastInsertId), JasyncInsertSyntheticMetadata(generatedKeyName)) }
39+
Mono.fromSupplier {
40+
mappingFunction.apply(
41+
JasyncInsertSyntheticRow(generatedKeyName, lastInsertId),
42+
JasyncInsertSyntheticMetadata(generatedKeyName)
43+
)
44+
}
3845
} else {
3946
Flux.fromIterable(resultSet)
40-
.map { mappingFunction.apply(JasyncRow(it), metadata) }
47+
.map { mappingFunction.apply(JasyncRow(it, metadata), metadata) }
4148
}
4249
}
4350

4451
override fun filter(filter: Predicate<Result.Segment>): Result {
45-
TODO("Not yet implemented")
52+
return JasyncSegmentResult(this).filter(filter)
4653
}
4754

4855
override fun <T : Any?> flatMap(mappingFunction: Function<Result.Segment, out Publisher<out T>>): Publisher<T> {
49-
TODO("Not yet implemented")
56+
return JasyncSegmentResult(this).flatMap(mappingFunction)
57+
}
58+
59+
class JasyncSegmentResult private constructor(
60+
private val segments: Flux<Result.Segment>,
61+
private val result: JasyncResult
62+
) : Result {
63+
constructor(result: JasyncResult) : this(
64+
Flux.concat(
65+
Flux.fromIterable(result.resultSet)
66+
.map { JasyncRow(it, result.metadata) },
67+
Flux.just(Result.UpdateCount { result.rowsAffected })
68+
),
69+
result
70+
)
71+
72+
override fun getRowsUpdated(): Publisher<Long> {
73+
return result.rowsUpdated
74+
}
75+
76+
override fun <T : Any?> map(mappingFunction: BiFunction<Row, RowMetadata, out T>): Publisher<T> {
77+
return segments
78+
.handle { segment, sink ->
79+
if (segment is RowSegment) {
80+
sink.next(mappingFunction.apply(segment.row(), segment.row().metadata))
81+
}
82+
}
83+
}
84+
85+
override fun filter(filter: Predicate<Result.Segment>): Result {
86+
return JasyncSegmentResult(segments.filter(filter), result)
87+
}
88+
89+
override fun <T : Any?> flatMap(mappingFunction: Function<Result.Segment, out Publisher<out T>>): Publisher<T> {
90+
return segments.concatMap { segment: Result.Segment ->
91+
mappingFunction.apply(segment)
92+
}
93+
}
5094
}
5195
}

r2dbc-mysql/src/main/java/JasyncRow.kt

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.github.jasync.r2dbc.mysql
22

33
import com.github.jasync.sql.db.RowData
4+
import io.r2dbc.spi.Result
45
import io.r2dbc.spi.Row
56
import io.r2dbc.spi.RowMetadata
67
import java.math.BigDecimal
@@ -9,7 +10,7 @@ import java.time.LocalDate
910
import java.time.LocalDateTime
1011
import java.time.LocalTime
1112

12-
class JasyncRow(private val rowData: RowData) : Row {
13+
class JasyncRow(private val rowData: RowData, private val metadata: JasyncMetadata) : Row, Result.RowSegment {
1314

1415
override fun <T : Any?> get(index: Int, type: Class<T>): T? {
1516
return get(index as Any, type)
@@ -20,10 +21,10 @@ class JasyncRow(private val rowData: RowData) : Row {
2021
}
2122

2223
override fun getMetadata(): RowMetadata {
23-
TODO("Not yet implemented")
24+
return metadata
2425
}
2526

26-
@Suppress("UNCHECKED_CAST", "IMPLICIT_CAST_TO_ANY")
27+
@Suppress("UNCHECKED_CAST")
2728
private fun <T> get(identifier: Any, requestedType: Class<T>): T? {
2829
val value = get(identifier)
2930
return when {
@@ -92,4 +93,8 @@ class JasyncRow(private val rowData: RowData) : Row {
9293
else -> value
9394
}
9495
}
96+
97+
override fun row(): Row {
98+
return this
99+
}
95100
}

r2dbc-mysql/src/test/java/com/github/jasync/r2dbc/mysql/integ/JasyncR2dbcIntegTest.kt

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import com.github.jasync.sql.db.mysql.MySQLConnection
55
import com.github.jasync.sql.db.mysql.pool.MySQLConnectionFactory
66
import com.github.jasync.sql.db.util.FP
77
import io.mockk.mockk
8+
import io.r2dbc.spi.Result
89
import org.assertj.core.api.Assertions
910
import org.awaitility.kotlin.await
1011
import org.junit.Test
@@ -63,4 +64,35 @@ class JasyncR2dbcIntegTest : R2dbcConnectionHelper() {
6364
await.until { rows == 1 }
6465
}
6566
}
67+
68+
@Test
69+
fun `filter test`() {
70+
withConnection { c ->
71+
var rows = 0
72+
executeQuery(c, createTable)
73+
executeQuery(c, """INSERT INTO users (name) VALUES ('Boogie Man'),('Dambeldor')""")
74+
val mycf = object : MySQLConnectionFactory(mockk()) {
75+
override fun create(): CompletableFuture<MySQLConnection> {
76+
return FP.successful(c)
77+
}
78+
}
79+
val cf = JasyncConnectionFactory(mycf)
80+
Mono.from(cf.create())
81+
.flatMapMany { connection ->
82+
connection
83+
.createStatement("SELECT name FROM users")
84+
.execute()
85+
}
86+
.map { result ->
87+
result
88+
// we test this function
89+
.filter { segment ->
90+
segment is Result.RowSegment && segment.row().get("name") == "Dambeldor"
91+
}
92+
}
93+
.doOnNext { rows++ }
94+
.subscribe()
95+
await.until { rows == 1 }
96+
}
97+
}
6698
}

0 commit comments

Comments
 (0)