Skip to content

an initial impl for r2dbc filter/flatmap #356

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 48 additions & 4 deletions r2dbc-mysql/src/main/java/JasyncResult.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package com.github.jasync.r2dbc.mysql

import com.github.jasync.sql.db.ResultSet
import io.r2dbc.spi.Result
import io.r2dbc.spi.Result.RowSegment
import io.r2dbc.spi.Row
import io.r2dbc.spi.RowMetadata
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
Expand Down Expand Up @@ -34,18 +36,60 @@ class JasyncResult(

override fun <T> map(mappingFunction: BiFunction<io.r2dbc.spi.Row, RowMetadata, out T>): Publisher<T> {
return if (selectLastInsertId) {
Mono.fromSupplier { mappingFunction.apply(JasyncInsertSyntheticRow(generatedKeyName, lastInsertId), JasyncInsertSyntheticMetadata(generatedKeyName)) }
Mono.fromSupplier {
mappingFunction.apply(
JasyncInsertSyntheticRow(generatedKeyName, lastInsertId),
JasyncInsertSyntheticMetadata(generatedKeyName)
)
}
} else {
Flux.fromIterable(resultSet)
.map { mappingFunction.apply(JasyncRow(it), metadata) }
.map { mappingFunction.apply(JasyncRow(it, metadata), metadata) }
}
}

override fun filter(filter: Predicate<Result.Segment>): Result {
TODO("Not yet implemented")
return JasyncSegmentResult(this).filter(filter)
}

override fun <T : Any?> flatMap(mappingFunction: Function<Result.Segment, out Publisher<out T>>): Publisher<T> {
TODO("Not yet implemented")
return JasyncSegmentResult(this).flatMap(mappingFunction)
}

class JasyncSegmentResult private constructor(
private val segments: Flux<Result.Segment>,
private val result: JasyncResult
) : Result {
constructor(result: JasyncResult) : this(
Flux.concat(
Flux.fromIterable(result.resultSet)
.map { JasyncRow(it, result.metadata) },
Flux.just(Result.UpdateCount { result.rowsAffected })
),
result
)

override fun getRowsUpdated(): Publisher<Long> {
return result.rowsUpdated
}

override fun <T : Any?> map(mappingFunction: BiFunction<Row, RowMetadata, out T>): Publisher<T> {
return segments
.handle { segment, sink ->
if (segment is RowSegment) {
sink.next(mappingFunction.apply(segment.row(), segment.row().metadata))
}
}
}

override fun filter(filter: Predicate<Result.Segment>): Result {
return JasyncSegmentResult(segments.filter(filter), result)
}

override fun <T : Any?> flatMap(mappingFunction: Function<Result.Segment, out Publisher<out T>>): Publisher<T> {
return segments.concatMap { segment: Result.Segment ->
mappingFunction.apply(segment)
}
}
}
}
11 changes: 8 additions & 3 deletions r2dbc-mysql/src/main/java/JasyncRow.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.github.jasync.r2dbc.mysql

import com.github.jasync.sql.db.RowData
import io.r2dbc.spi.Result
import io.r2dbc.spi.Row
import io.r2dbc.spi.RowMetadata
import java.math.BigDecimal
Expand All @@ -9,7 +10,7 @@ import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime

class JasyncRow(private val rowData: RowData) : Row {
class JasyncRow(private val rowData: RowData, private val metadata: JasyncMetadata) : Row, Result.RowSegment {

override fun <T : Any?> get(index: Int, type: Class<T>): T? {
return get(index as Any, type)
Expand All @@ -20,10 +21,10 @@ class JasyncRow(private val rowData: RowData) : Row {
}

override fun getMetadata(): RowMetadata {
TODO("Not yet implemented")
return metadata
}

@Suppress("UNCHECKED_CAST", "IMPLICIT_CAST_TO_ANY")
@Suppress("UNCHECKED_CAST")
private fun <T> get(identifier: Any, requestedType: Class<T>): T? {
val value = get(identifier)
return when {
Expand Down Expand Up @@ -92,4 +93,8 @@ class JasyncRow(private val rowData: RowData) : Row {
else -> value
}
}

override fun row(): Row {
return this
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.github.jasync.sql.db.mysql.MySQLConnection
import com.github.jasync.sql.db.mysql.pool.MySQLConnectionFactory
import com.github.jasync.sql.db.util.FP
import io.mockk.mockk
import io.r2dbc.spi.Result
import org.assertj.core.api.Assertions
import org.awaitility.kotlin.await
import org.junit.Test
Expand Down Expand Up @@ -63,4 +64,35 @@ class JasyncR2dbcIntegTest : R2dbcConnectionHelper() {
await.until { rows == 1 }
}
}

@Test
fun `filter test`() {
withConnection { c ->
var rows = 0
executeQuery(c, createTable)
executeQuery(c, """INSERT INTO users (name) VALUES ('Boogie Man'),('Dambeldor')""")
val mycf = object : MySQLConnectionFactory(mockk()) {
override fun create(): CompletableFuture<MySQLConnection> {
return FP.successful(c)
}
}
val cf = JasyncConnectionFactory(mycf)
Mono.from(cf.create())
.flatMapMany { connection ->
connection
.createStatement("SELECT name FROM users")
.execute()
}
.map { result ->
result
// we test this function
.filter { segment ->
segment is Result.RowSegment && segment.row().get("name") == "Dambeldor"
}
}
.doOnNext { rows++ }
.subscribe()
await.until { rows == 1 }
}
}
}