From d0be1fe5c14e8663fc663889e657a56d11d4a7ef Mon Sep 17 00:00:00 2001 From: oshai Date: Fri, 13 Jan 2023 12:59:34 +0200 Subject: [PATCH 1/3] an initial impl for r2dbc filter/flatmap on operation create a new instance from result `JasyncSegmentResult` JasyncSegmentResult has a flux of segments which are rows + number of rows segment. Then filter and flatmap are operating on that flux. There is no error handling here as the original result don't have such concept. JasyncStatement.execute() either yields result or exception (exception is not a result). see #351 --- r2dbc-mysql/src/main/java/JasyncResult.kt | 52 +++++++++++++++++-- r2dbc-mysql/src/main/java/JasyncRow.kt | 11 ++-- .../src/main/java/JasyncSegmentResult.kt | 1 + 3 files changed, 57 insertions(+), 7 deletions(-) create mode 100644 r2dbc-mysql/src/main/java/JasyncSegmentResult.kt diff --git a/r2dbc-mysql/src/main/java/JasyncResult.kt b/r2dbc-mysql/src/main/java/JasyncResult.kt index 0130b939..14c48d59 100644 --- a/r2dbc-mysql/src/main/java/JasyncResult.kt +++ b/r2dbc-mysql/src/main/java/JasyncResult.kt @@ -2,6 +2,8 @@ package com.github.jasync.r2dbc.mysql import com.github.jasync.sql.db.ResultSet import io.r2dbc.spi.Result +import io.r2dbc.spi.Result.RowSegment +import io.r2dbc.spi.Row import io.r2dbc.spi.RowMetadata import org.reactivestreams.Publisher import reactor.core.publisher.Flux @@ -34,18 +36,60 @@ class JasyncResult( override fun map(mappingFunction: BiFunction): Publisher { return if (selectLastInsertId) { - Mono.fromSupplier { mappingFunction.apply(JasyncInsertSyntheticRow(generatedKeyName, lastInsertId), JasyncInsertSyntheticMetadata(generatedKeyName)) } + Mono.fromSupplier { + mappingFunction.apply( + JasyncInsertSyntheticRow(generatedKeyName, lastInsertId), + JasyncInsertSyntheticMetadata(generatedKeyName) + ) + } } else { Flux.fromIterable(resultSet) - .map { mappingFunction.apply(JasyncRow(it), metadata) } + .map { mappingFunction.apply(JasyncRow(it, metadata), metadata) } } } override fun filter(filter: Predicate): Result { - TODO("Not yet implemented") + return JasyncSegmentResult(this).filter(filter) } override fun flatMap(mappingFunction: Function>): Publisher { - TODO("Not yet implemented") + return JasyncSegmentResult(this).flatMap(mappingFunction) + } + + class JasyncSegmentResult private constructor( + private val segments: Flux, + private val result: JasyncResult + ) : Result { + constructor(result: JasyncResult) : this( + Flux.concat( + Flux.fromIterable(result.resultSet) + .map { JasyncRow(it, result.metadata) }, + Flux.just(Result.UpdateCount { result.rowsAffected }) + ), + result + ) + + override fun getRowsUpdated(): Publisher { + return result.rowsUpdated + } + + override fun map(mappingFunction: BiFunction): Publisher { + return segments + .handle { segment, sink -> + if (segment is RowSegment) { + sink.next(mappingFunction.apply(segment.row(), segment.row().metadata)) + } + } + } + + override fun filter(filter: Predicate): Result { + return JasyncSegmentResult(segments.filter(filter), result) + } + + override fun flatMap(mappingFunction: Function>): Publisher { + return segments.concatMap { segment: Result.Segment -> + mappingFunction.apply(segment) + } + } } } diff --git a/r2dbc-mysql/src/main/java/JasyncRow.kt b/r2dbc-mysql/src/main/java/JasyncRow.kt index 6abcfff9..d272c9c0 100644 --- a/r2dbc-mysql/src/main/java/JasyncRow.kt +++ b/r2dbc-mysql/src/main/java/JasyncRow.kt @@ -1,6 +1,7 @@ package com.github.jasync.r2dbc.mysql import com.github.jasync.sql.db.RowData +import io.r2dbc.spi.Result import io.r2dbc.spi.Row import io.r2dbc.spi.RowMetadata import java.math.BigDecimal @@ -9,7 +10,7 @@ import java.time.LocalDate import java.time.LocalDateTime import java.time.LocalTime -class JasyncRow(private val rowData: RowData) : Row { +class JasyncRow(private val rowData: RowData, private val metadata: JasyncMetadata) : Row, Result.RowSegment { override fun get(index: Int, type: Class): T? { return get(index as Any, type) @@ -20,10 +21,10 @@ class JasyncRow(private val rowData: RowData) : Row { } override fun getMetadata(): RowMetadata { - TODO("Not yet implemented") + return metadata } - @Suppress("UNCHECKED_CAST", "IMPLICIT_CAST_TO_ANY") + @Suppress("UNCHECKED_CAST") private fun get(identifier: Any, requestedType: Class): T? { val value = get(identifier) return when { @@ -92,4 +93,8 @@ class JasyncRow(private val rowData: RowData) : Row { else -> value } } + + override fun row(): Row { + return this + } } diff --git a/r2dbc-mysql/src/main/java/JasyncSegmentResult.kt b/r2dbc-mysql/src/main/java/JasyncSegmentResult.kt new file mode 100644 index 00000000..e1da174c --- /dev/null +++ b/r2dbc-mysql/src/main/java/JasyncSegmentResult.kt @@ -0,0 +1 @@ +package com.github.jasync.r2dbc.mysql From b4c843605ab15007eb5a2b33119e863f299a7a09 Mon Sep 17 00:00:00 2001 From: oshai Date: Fri, 13 Jan 2023 13:01:30 +0200 Subject: [PATCH 2/3] delete extra file --- r2dbc-mysql/src/main/java/JasyncSegmentResult.kt | 1 - 1 file changed, 1 deletion(-) delete mode 100644 r2dbc-mysql/src/main/java/JasyncSegmentResult.kt diff --git a/r2dbc-mysql/src/main/java/JasyncSegmentResult.kt b/r2dbc-mysql/src/main/java/JasyncSegmentResult.kt deleted file mode 100644 index e1da174c..00000000 --- a/r2dbc-mysql/src/main/java/JasyncSegmentResult.kt +++ /dev/null @@ -1 +0,0 @@ -package com.github.jasync.r2dbc.mysql From 474ffb32ef584bd9993d26789afa1f297b2ad00d Mon Sep 17 00:00:00 2001 From: oshai Date: Sat, 14 Jan 2023 17:52:07 +0200 Subject: [PATCH 3/3] add unit test for filter --- .../r2dbc/mysql/integ/JasyncR2dbcIntegTest.kt | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/r2dbc-mysql/src/test/java/com/github/jasync/r2dbc/mysql/integ/JasyncR2dbcIntegTest.kt b/r2dbc-mysql/src/test/java/com/github/jasync/r2dbc/mysql/integ/JasyncR2dbcIntegTest.kt index 87a68053..fa4b28d1 100644 --- a/r2dbc-mysql/src/test/java/com/github/jasync/r2dbc/mysql/integ/JasyncR2dbcIntegTest.kt +++ b/r2dbc-mysql/src/test/java/com/github/jasync/r2dbc/mysql/integ/JasyncR2dbcIntegTest.kt @@ -5,6 +5,7 @@ import com.github.jasync.sql.db.mysql.MySQLConnection import com.github.jasync.sql.db.mysql.pool.MySQLConnectionFactory import com.github.jasync.sql.db.util.FP import io.mockk.mockk +import io.r2dbc.spi.Result import org.assertj.core.api.Assertions import org.awaitility.kotlin.await import org.junit.Test @@ -63,4 +64,35 @@ class JasyncR2dbcIntegTest : R2dbcConnectionHelper() { await.until { rows == 1 } } } + + @Test + fun `filter test`() { + withConnection { c -> + var rows = 0 + executeQuery(c, createTable) + executeQuery(c, """INSERT INTO users (name) VALUES ('Boogie Man'),('Dambeldor')""") + val mycf = object : MySQLConnectionFactory(mockk()) { + override fun create(): CompletableFuture { + return FP.successful(c) + } + } + val cf = JasyncConnectionFactory(mycf) + Mono.from(cf.create()) + .flatMapMany { connection -> + connection + .createStatement("SELECT name FROM users") + .execute() + } + .map { result -> + result + // we test this function + .filter { segment -> + segment is Result.RowSegment && segment.row().get("name") == "Dambeldor" + } + } + .doOnNext { rows++ } + .subscribe() + await.until { rows == 1 } + } + } }