passwordPublisher) {
return this;
}
+ /**
+ * Sets the {@link AddressResolverGroup} for resolving host addresses.
+ *
+ * This can be used to customize the DNS resolution mechanism, which is particularly useful in environments
+ * with specific DNS configuration needs or where a custom DNS resolver is required.
+ *
+ * @param resolver the resolver group to use for host address resolution.
+ * @return this {@link Builder}.
+ * @since 1.2.0
+ */
+ public Builder resolver(AddressResolverGroup> resolver) {
+ this.resolver = resolver;
+ return this;
+ }
+
+ /**
+ * Option to enable metrics to be collected and registered in Micrometer's globalRegistry
+ * with {@link reactor.netty.tcp.TcpClient#metrics(boolean)}. Defaults to {@code false}.
+ *
+ * Note: It is required to add {@code io.micrometer.micrometer-core} dependency to classpath.
+ *
+ * @param enabled enable metrics for {@link reactor.netty.tcp.TcpClient}.
+ * @return this {@link Builder}
+ * @throws IllegalArgumentException if {@code io.micrometer:micrometer-core} is not on the classpath.
+ * @since 1.3.2
+ */
+ public Builder metrics(boolean enabled) {
+ require(!enabled || Metrics.isMicrometerAvailable(),
+ "dependency `io.micrometer:micrometer-core` must be added to classpath if metrics enabled"
+ );
+ this.metrics = enabled;
+ return this;
+ }
+
+ /**
+ * Option to whether the driver should interpret MySQL's TINYINT(1) as a BIT type.
+ * When enabled, TINYINT(1) columns will be treated as BIT. Defaults to {@code true}.
+ *
+ * Note: Only signed TINYINT(1) columns can be treated as BIT or Boolean.
+ * Ref: https://bugs.mysql.com/bug.php?id=100309
+ *
+ * @param tinyInt1isBit {@code true} to treat TINYINT(1) as BIT
+ * @return this {@link Builder}
+ * @since 1.4.0
+ */
+ public Builder tinyInt1isBit(boolean tinyInt1isBit) {
+ this.tinyInt1isBit = tinyInt1isBit;
+ return this;
+ }
+
private SslMode requireSslMode() {
SslMode sslMode = this.sslMode;
diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactory.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactory.java
index d003db2b0..094674f2a 100644
--- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactory.java
+++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactory.java
@@ -42,68 +42,65 @@
*/
public final class MySqlConnectionFactory implements ConnectionFactory {
- private final Mono extends MySqlConnection> client;
+ private final MySqlConnectionConfiguration configuration;
+ private final LazyQueryCache queryCache;
- private MySqlConnectionFactory(Mono extends MySqlConnection> client) {
- this.client = client;
+ private MySqlConnectionFactory(MySqlConnectionConfiguration configuration) {
+ this.configuration = configuration;
+ this.queryCache = new LazyQueryCache(configuration.getQueryCacheSize());
}
@Override
public Mono extends MySqlConnection> create() {
- return client;
- }
-
- @Override
- public ConnectionFactoryMetadata getMetadata() {
- return MySqlConnectionFactoryMetadata.INSTANCE;
- }
+ MySqlSslConfiguration ssl;
+ SocketAddress address;
- /**
- * Creates a {@link MySqlConnectionFactory} with a {@link MySqlConnectionConfiguration}.
- *
- * @param configuration the {@link MySqlConnectionConfiguration}.
- * @return configured {@link MySqlConnectionFactory}.
- */
- public static MySqlConnectionFactory from(MySqlConnectionConfiguration configuration) {
- requireNonNull(configuration, "configuration must not be null");
-
- LazyQueryCache queryCache = new LazyQueryCache(configuration.getQueryCacheSize());
-
- return new MySqlConnectionFactory(Mono.defer(() -> {
- MySqlSslConfiguration ssl;
- SocketAddress address;
-
- if (configuration.isHost()) {
- ssl = configuration.getSsl();
- address = InetSocketAddress.createUnresolved(configuration.getDomain(),
+ if (configuration.isHost()) {
+ ssl = configuration.getSsl();
+ address = InetSocketAddress.createUnresolved(configuration.getDomain(),
configuration.getPort());
- } else {
- ssl = MySqlSslConfiguration.disabled();
- address = new DomainSocketAddress(configuration.getDomain());
- }
+ } else {
+ ssl = MySqlSslConfiguration.disabled();
+ address = new DomainSocketAddress(configuration.getDomain());
+ }
- String user = configuration.getUser();
- CharSequence password = configuration.getPassword();
- Publisher passwordPublisher = configuration.getPasswordPublisher();
+ String user = configuration.getUser();
+ CharSequence password = configuration.getPassword();
+ Publisher passwordPublisher = configuration.getPasswordPublisher();
- if (Objects.nonNull(passwordPublisher)) {
- return Mono.from(passwordPublisher).flatMap(token -> getMySqlConnection(
+ if (Objects.nonNull(passwordPublisher)) {
+ return Mono.from(passwordPublisher).flatMap(token -> getMySqlConnection(
configuration, ssl,
queryCache,
address,
user,
token
- ));
- }
+ ));
+ }
- return getMySqlConnection(
+ return getMySqlConnection(
configuration, ssl,
queryCache,
address,
user,
password
- );
- }));
+ );
+ }
+
+ @Override
+ public ConnectionFactoryMetadata getMetadata() {
+ return MySqlConnectionFactoryMetadata.INSTANCE;
+ }
+
+ /**
+ * Creates a {@link MySqlConnectionFactory} with a {@link MySqlConnectionConfiguration}.
+ *
+ * @param configuration the {@link MySqlConnectionConfiguration}.
+ * @return configured {@link MySqlConnectionFactory}.
+ */
+ public static MySqlConnectionFactory from(MySqlConnectionConfiguration configuration) {
+ requireNonNull(configuration, "configuration must not be null");
+ return new MySqlConnectionFactory(configuration);
}
/**
@@ -137,6 +134,7 @@ private static Mono getMySqlConnection(
configuration.getZeroDateOption(),
configuration.getLoadLocalInfilePath(),
configuration.getLocalInfileBufferSize(),
+ configuration.isTinyInt1isBit(),
configuration.isPreserveInstants(),
connectionTimeZone
);
@@ -147,7 +145,9 @@ private static Mono getMySqlConnection(
configuration.isTcpNoDelay(),
context,
configuration.getConnectTimeout(),
- configuration.getLoopResources()
+ configuration.getLoopResources(),
+ configuration.getResolver(),
+ configuration.isMetrics()
)).flatMap(client -> {
// Lazy init database after handshake/login
boolean deferDatabase = configuration.isCreateDatabaseIfNotExist();
diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProvider.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProvider.java
index f6dc1a57a..5905c56ca 100644
--- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProvider.java
+++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProvider.java
@@ -20,6 +20,7 @@
import io.asyncer.r2dbc.mysql.constant.SslMode;
import io.asyncer.r2dbc.mysql.constant.ZeroDateOption;
import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.resolver.AddressResolverGroup;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryOptions;
import io.r2dbc.spi.ConnectionFactoryProvider;
@@ -308,6 +309,38 @@ public final class MySqlConnectionFactoryProvider implements ConnectionFactoryPr
*/
public static final Option> PASSWORD_PUBLISHER = Option.valueOf("passwordPublisher");
+ /**
+ * Option to set the {@link AddressResolverGroup} for resolving host addresses.
+ *
+ * This can be used to customize the DNS resolution mechanism, which is particularly useful in environments
+ * with specific DNS configuration needs or where a custom DNS resolver is required.
+ *
+ *
+ * @since 1.2.0
+ */
+ public static final Option> RESOLVER = Option.valueOf("resolver");
+
+ /**
+ * Option to enable metrics to be collected and registered in Micrometer's globalRegistry
+ * with {@link reactor.netty.tcp.TcpClient#metrics(boolean)}. Defaults to {@code false}.
+ *
+ * Note: It is required to add {@code io.micrometer.micrometer-core} dependency to classpath.
+ *
+ * @since 1.3.2
+ */
+ public static final Option METRICS = Option.valueOf("metrics");
+
+ /**
+ * Option to whether the driver should interpret MySQL's TINYINT(1) as a BIT type.
+ * When enabled, TINYINT(1) columns will be treated as BIT. Defaults to {@code true}.
+ *
+ * Note: Only signed TINYINT(1) columns can be treated as BIT or Boolean.
+ * Ref: https://bugs.mysql.com/bug.php?id=100309
+ *
+ * @since 1.4.0
+ */
+ public static final Option TINY_INT_1_IS_BIT = Option.valueOf("tinyInt1isBit");
+
@Override
public ConnectionFactory create(ConnectionFactoryOptions options) {
requireNonNull(options, "connectionFactoryOptions must not be null");
@@ -389,6 +422,8 @@ static MySqlConnectionConfiguration setup(ConnectionFactoryOptions options) {
.to(builder::loopResources);
mapper.optional(PASSWORD_PUBLISHER).as(Publisher.class)
.to(builder::passwordPublisher);
+ mapper.optional(RESOLVER).as(AddressResolverGroup.class)
+ .to(builder::resolver);
mapper.optional(SESSION_VARIABLES).asArray(
String[].class,
Function.identity(),
@@ -399,6 +434,10 @@ static MySqlConnectionConfiguration setup(ConnectionFactoryOptions options) {
.to(builder::lockWaitTimeout);
mapper.optional(STATEMENT_TIMEOUT).as(Duration.class, Duration::parse)
.to(builder::statementTimeout);
+ mapper.optional(METRICS).asBoolean()
+ .to(builder::metrics);
+ mapper.optional(TINY_INT_1_IS_BIT).asBoolean()
+ .to(builder::tinyInt1isBit);
return builder.build();
}
diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/Client.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/Client.java
index d7c3ac28a..316d90999 100644
--- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/Client.java
+++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/Client.java
@@ -22,6 +22,7 @@
import io.asyncer.r2dbc.mysql.message.server.ServerMessage;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelOption;
+import io.netty.resolver.AddressResolverGroup;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import org.jetbrains.annotations.Nullable;
@@ -126,19 +127,21 @@ public interface Client {
* @param context the connection context
* @param connectTimeout connect timeout, or {@code null} if it has no timeout
* @param loopResources the loop resources to use
+ * @param metrics if enable the {@link TcpClient#metrics)}
* @return A {@link Mono} that will emit a connected {@link Client}.
* @throws IllegalArgumentException if {@code ssl}, {@code address} or {@code context} is {@code null}.
* @throws ArithmeticException if {@code connectTimeout} milliseconds overflow as an int
*/
static Mono connect(MySqlSslConfiguration ssl, SocketAddress address, boolean tcpKeepAlive,
boolean tcpNoDelay, ConnectionContext context, @Nullable Duration connectTimeout,
- LoopResources loopResources) {
+ LoopResources loopResources, @Nullable AddressResolverGroup> resolver, boolean metrics) {
requireNonNull(ssl, "ssl must not be null");
requireNonNull(address, "address must not be null");
requireNonNull(context, "context must not be null");
TcpClient tcpClient = TcpClient.newConnection()
- .runOn(loopResources);
+ .runOn(loopResources)
+ .metrics(metrics);
if (connectTimeout != null) {
tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
@@ -150,6 +153,10 @@ static Mono connect(MySqlSslConfiguration ssl, SocketAddress address, bo
tcpClient = tcpClient.option(ChannelOption.TCP_NODELAY, tcpNoDelay);
}
+ if (resolver != null) {
+ tcpClient = tcpClient.resolver(resolver);
+ }
+
return tcpClient.remoteAddress(() -> address).connect()
.map(conn -> new ReactorNettyClient(conn, ssl, context));
}
diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java
index 81cb5f21e..5054f3631 100644
--- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java
+++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java
@@ -118,7 +118,7 @@ final class ReactorNettyClient implements Client {
}
sink.next((ServerMessage) it);
} else {
- // ReferenceCounted will released by Netty.
+ // ReferenceCounted will be released by Netty.
throw ClientExceptions.unsupportedProtocol(it.getClass().getTypeName());
}
})
@@ -131,6 +131,14 @@ final class ReactorNettyClient implements Client {
logger.debug("Request: {}", message);
}
+ if (message == ExitMessage.INSTANCE) {
+ if (STATE_UPDATER.compareAndSet(this, ST_CONNECTED, ST_CLOSING)) {
+ logger.debug("Exit message sent");
+ } else {
+ logger.debug("Exit message sent (duplicated / connection already closed)");
+ }
+ }
+
if (message.isSequenceReset()) {
resetSequence(connection);
}
@@ -213,15 +221,8 @@ public Mono close() {
requestQueue.submit(RequestTask.wrap(sink, Mono.fromRunnable(() -> {
Sinks.EmitResult result = requests.tryEmitNext(ExitMessage.INSTANCE);
-
if (result != Sinks.EmitResult.OK) {
logger.error("Exit message sending failed due to {}, force closing", result);
- } else {
- if (STATE_UPDATER.compareAndSet(this, ST_CONNECTED, ST_CLOSING)) {
- logger.debug("Exit message sent");
- } else {
- logger.debug("Exit message sent (duplicated / connection already closed)");
- }
}
})));
}).flatMap(Function.identity()).onErrorResume(e -> {
@@ -378,17 +379,17 @@ public void error(Throwable e) {
@Override
public void next(ServerMessage message) {
- if (message instanceof WarningMessage) {
- int warnings = ((WarningMessage) message).getWarnings();
- if (warnings == 0) {
- if (DEBUG_ENABLED) {
+ if (DEBUG_ENABLED) {
+ if (message instanceof WarningMessage) {
+ final int warnings = ((WarningMessage) message).getWarnings();
+ if (warnings == 0) {
logger.debug("Response: {}", message);
+ } else {
+ logger.debug("Response: {}, reports {} warning(s)", message, warnings);
}
- } else if (INFO_ENABLED) {
- logger.info("Response: {}, reports {} warning(s)", message, warnings);
+ } else {
+ logger.debug("Response: {}", message);
}
- } else if (DEBUG_ENABLED) {
- logger.debug("Response: {}", message);
}
responseProcessor.emitNext(message, EmitFailureHandler.FAIL_FAST);
diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/SslBridgeHandler.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/SslBridgeHandler.java
index ce3361fd4..22038bb43 100644
--- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/SslBridgeHandler.java
+++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/SslBridgeHandler.java
@@ -33,6 +33,7 @@
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
+import reactor.core.Exceptions;
import reactor.netty.tcp.SslProvider;
import javax.net.ssl.HostnameVerifier;
@@ -40,7 +41,6 @@
import javax.net.ssl.SSLException;
import java.io.File;
import java.net.InetSocketAddress;
-import java.util.function.Consumer;
import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.requireNonNull;
import static io.netty.handler.ssl.SslProvider.JDK;
@@ -151,10 +151,16 @@ private void handleSslState(ChannelHandlerContext ctx, SslState state) {
switch (state) {
case BRIDGING:
logger.debug("SSL event triggered, enable SSL handler to pipeline");
-
- SslProvider sslProvider = SslProvider.builder()
- .sslContext(MySqlSslContextSpec.forClient(ssl, context))
- .build();
+ final SslProvider sslProvider;
+ try {
+ // Workaround for a forward incompatible change in reactor-netty version 1.2.0
+ // See: https://github.com/reactor/reactor-netty/commit/6d0c24d83a7c5b15e403475272293f847415191c
+ sslProvider = SslProvider.builder()
+ .sslContext(MySqlSslContextSpec.forClient(ssl, context).sslContext())
+ .build();
+ } catch (SSLException e) {
+ throw Exceptions.propagate(e);
+ }
SslHandler sslHandler = sslProvider.getSslContext().newHandler(ctx.alloc());
this.sslEngine = sslHandler.engine();
@@ -195,7 +201,7 @@ private static boolean isTls13Enabled(ConnectionContext context) {
|| (version.isGreaterThanOrEqualTo(MYSQL_5_6_0) && version.isEnterprise());
}
- private static final class MySqlSslContextSpec implements SslProvider.ProtocolSslContextSpec {
+ private static final class MySqlSslContextSpec {
private final SslContextBuilder builder;
@@ -203,16 +209,6 @@ private MySqlSslContextSpec(SslContextBuilder builder) {
this.builder = builder;
}
- @Override
- public MySqlSslContextSpec configure(Consumer customizer) {
- requireNonNull(customizer, "customizer must not be null");
-
- customizer.accept(builder);
-
- return this;
- }
-
- @Override
public SslContext sslContext() throws SSLException {
return builder.build();
}
diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/codec/BooleanCodec.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/codec/BooleanCodec.java
index f546ba751..8fb98c273 100644
--- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/codec/BooleanCodec.java
+++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/codec/BooleanCodec.java
@@ -16,12 +16,15 @@
package io.asyncer.r2dbc.mysql.codec;
+import java.math.BigInteger;
+
import io.asyncer.r2dbc.mysql.MySqlParameter;
import io.asyncer.r2dbc.mysql.ParameterWriter;
import io.asyncer.r2dbc.mysql.api.MySqlReadableMetadata;
import io.asyncer.r2dbc.mysql.constant.MySqlType;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
+import io.r2dbc.spi.R2dbcNonTransientResourceException;
import reactor.core.publisher.Mono;
/**
@@ -29,6 +32,8 @@
*/
final class BooleanCodec extends AbstractPrimitiveCodec {
+ private static final Integer INTEGER_ONE = Integer.valueOf(1);
+
static final BooleanCodec INSTANCE = new BooleanCodec();
private BooleanCodec() {
@@ -38,7 +43,35 @@ private BooleanCodec() {
@Override
public Boolean decode(ByteBuf value, MySqlReadableMetadata metadata, Class> target, boolean binary,
CodecContext context) {
- return binary || metadata.getType() == MySqlType.BIT ? value.readBoolean() : value.readByte() != '0';
+ MySqlType dataType = metadata.getType();
+
+ if (dataType == MySqlType.VARCHAR) {
+ if (!value.isReadable()) {
+ return createFromLong(0);
+ }
+
+ String s = value.toString(metadata.getCharCollation(context).getCharset());
+
+ if (s.equalsIgnoreCase("Y") || s.equalsIgnoreCase("yes") ||
+ s.equalsIgnoreCase("T") || s.equalsIgnoreCase("true")) {
+ return createFromLong(1);
+ } else if (s.equalsIgnoreCase("N") || s.equalsIgnoreCase("no") ||
+ s.equalsIgnoreCase("F") || s.equalsIgnoreCase("false")) {
+ return createFromLong(0);
+ } else if (s.matches("-?\\d*\\.\\d*") || s.matches("-?\\d*\\.\\d+[eE]-?\\d+")
+ || s.matches("-?\\d*[eE]-?\\d+")) {
+ return createFromDouble(Double.parseDouble(s));
+ } else if (s.matches("-?\\d+")) {
+ if (!CodecUtils.isGreaterThanLongMax(s)) {
+ return createFromLong(CodecUtils.parseLong(value));
+ }
+ return createFromBigInteger(new BigInteger(s));
+ }
+ throw new R2dbcNonTransientResourceException("The value '" + s + "' of type '" + dataType +
+ "' cannot be encoded into a Boolean.", "22018");
+ }
+
+ return binary || dataType == MySqlType.BIT ? value.readBoolean() : value.readByte() != '0';
}
@Override
@@ -54,8 +87,20 @@ public MySqlParameter encode(Object value, CodecContext context) {
@Override
public boolean doCanDecode(MySqlReadableMetadata metadata) {
MySqlType type = metadata.getType();
- return (type == MySqlType.BIT || type == MySqlType.TINYINT) &&
- Integer.valueOf(1).equals(metadata.getPrecision());
+ return ((type == MySqlType.BIT || type == MySqlType.TINYINT) &&
+ INTEGER_ONE.equals(metadata.getPrecision())) || type == MySqlType.VARCHAR;
+ }
+
+ public Boolean createFromLong(long l) {
+ return (l == -1 || l > 0);
+ }
+
+ public Boolean createFromDouble(double d) {
+ return (d == -1.0d || d > 0);
+ }
+
+ public Boolean createFromBigInteger(BigInteger b) {
+ return b.compareTo(BigInteger.valueOf(0)) > 0 || b.compareTo(BigInteger.valueOf(-1)) == 0;
}
private static final class BooleanMySqlParameter extends AbstractMySqlParameter {
diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/codec/ByteArrayInputStreamCodec.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/codec/ByteArrayInputStreamCodec.java
new file mode 100644
index 000000000..c31261f5a
--- /dev/null
+++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/codec/ByteArrayInputStreamCodec.java
@@ -0,0 +1,150 @@
+/*
+ * Copyright 2025 asyncer.io projects
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.asyncer.r2dbc.mysql.codec;
+
+import io.asyncer.r2dbc.mysql.MySqlParameter;
+import io.asyncer.r2dbc.mysql.ParameterWriter;
+import io.asyncer.r2dbc.mysql.api.MySqlReadableMetadata;
+import io.asyncer.r2dbc.mysql.constant.MySqlType;
+import io.asyncer.r2dbc.mysql.internal.util.VarIntUtils;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import reactor.core.publisher.Mono;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+
+import static io.asyncer.r2dbc.mysql.internal.util.InternalArrays.EMPTY_BYTES;
+
+/**
+ * Codec for {@link InputStream}.
+ */
+final class ByteArrayInputStreamCodec extends AbstractClassedCodec {
+
+ static final ByteArrayInputStreamCodec INSTANCE = new ByteArrayInputStreamCodec();
+
+ private ByteArrayInputStreamCodec() {
+ super(ByteArrayInputStream.class);
+ }
+
+ @Override
+ public ByteArrayInputStream decode(ByteBuf value, MySqlReadableMetadata metadata, Class> target, boolean binary,
+ CodecContext context) {
+ if (!value.isReadable()) {
+ return new ByteArrayInputStream(EMPTY_BYTES);
+ }
+ return new ByteArrayInputStream(value.array());
+ }
+
+ @Override
+ protected boolean doCanDecode(MySqlReadableMetadata metadata) {
+ return metadata.getType().isBinary();
+ }
+
+ @Override
+ public boolean canEncode(Object value) {
+ return value instanceof ByteArrayInputStream;
+ }
+
+ @Override
+ public MySqlParameter encode(Object value, CodecContext context) {
+ return new ByteArrayInputStreamMysqlParameter((ByteArrayInputStream) value);
+ }
+
+ private static final class ByteArrayInputStreamMysqlParameter extends AbstractMySqlParameter {
+
+ private final ByteArrayInputStream value;
+
+ private ByteArrayInputStreamMysqlParameter(ByteArrayInputStream value) {
+ this.value = value;
+ }
+
+ @Override
+ public Mono publishBinary(ByteBufAllocator allocator) {
+ return Mono.fromSupplier(() -> {
+ int size = value.available();
+ if (size == 0) {
+ return allocator.buffer(Byte.BYTES).writeByte(0);
+ }
+
+ int addedSize = VarIntUtils.varIntBytes(size);
+ ByteBuf buf = allocator.buffer(addedSize + size);
+
+ try {
+ VarIntUtils.writeVarInt(buf, size);
+ int readBytes = buf.writeBytes(value, size);
+ if (readBytes != size) {
+ buf.release();
+ throw new IllegalStateException("Expected to read " + size + " bytes, but got " + readBytes);
+ }
+
+ return buf;
+ } catch (Exception e) {
+ buf.release();
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @Override
+ public Mono publishText(ParameterWriter writer) {
+ return Mono.fromRunnable(() -> {
+ try {
+ int size = value.available();
+ byte[] byteArray = new byte[size];
+ int readBytes = value.read(byteArray);
+
+ if (size != 0 && readBytes != size) {
+ throw new IllegalStateException("Expected to read " + size + " bytes, but got " + readBytes);
+ }
+
+ writer.writeHex(byteArray);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @Override
+ public String toString() {
+ return value.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ByteArrayInputStreamMysqlParameter)) {
+ return false;
+ }
+
+ ByteArrayInputStreamMysqlParameter that = (ByteArrayInputStreamMysqlParameter) o;
+ return value.equals(that.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return value.hashCode();
+ }
+
+ @Override
+ public MySqlType getType() {
+ return MySqlType.VARBINARY;
+ }
+ }
+}
diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/codec/CodecContext.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/codec/CodecContext.java
index 8eda9c985..5b58fa5f6 100644
--- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/codec/CodecContext.java
+++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/codec/CodecContext.java
@@ -69,4 +69,10 @@ public interface CodecContext {
* @return if is MariaDB.
*/
boolean isMariaDb();
+
+ /**
+ *
+ * @return true if tinyInt(1) is treated as bit.
+ */
+ boolean isTinyInt1isBit();
}
diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/codec/DefaultCodecs.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/codec/DefaultCodecs.java
index d76b398e2..01e47348c 100644
--- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/codec/DefaultCodecs.java
+++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/codec/DefaultCodecs.java
@@ -18,6 +18,7 @@
import io.asyncer.r2dbc.mysql.MySqlParameter;
import io.asyncer.r2dbc.mysql.api.MySqlReadableMetadata;
+import io.asyncer.r2dbc.mysql.constant.MySqlType;
import io.asyncer.r2dbc.mysql.internal.util.InternalArrays;
import io.asyncer.r2dbc.mysql.message.FieldValue;
import io.asyncer.r2dbc.mysql.message.LargeFieldValue;
@@ -80,7 +81,8 @@ final class DefaultCodecs implements Codecs {
BlobCodec.INSTANCE,
ByteBufferCodec.INSTANCE,
- ByteArrayCodec.INSTANCE
+ ByteArrayCodec.INSTANCE,
+ ByteArrayInputStreamCodec.INSTANCE
);
private final List> codecs;
@@ -136,6 +138,7 @@ private DefaultCodecs(List> codecs) {
* Note: this method should NEVER release {@code buf} because of it come from {@code MySqlRow} which will release
* this buffer.
*/
+ @Nullable
@Override
public T decode(FieldValue value, MySqlReadableMetadata metadata, Class> type, boolean binary,
CodecContext context) {
@@ -150,7 +153,7 @@ public T decode(FieldValue value, MySqlReadableMetadata metadata, Class> t
return null;
}
- Class> target = chooseClass(metadata, type);
+ Class> target = chooseClass(metadata, type, context);
if (value instanceof NormalFieldValue) {
return decodeNormal((NormalFieldValue) value, metadata, target, binary, context);
@@ -161,6 +164,7 @@ public T decode(FieldValue value, MySqlReadableMetadata metadata, Class> t
throw new IllegalArgumentException("Unknown value " + value.getClass().getSimpleName());
}
+ @Nullable
@Override
public T decode(FieldValue value, MySqlReadableMetadata metadata, ParameterizedType type,
boolean binary, CodecContext context) {
@@ -358,11 +362,34 @@ private T decodeMassive(LargeFieldValue value, MySqlReadableMetadata metadat
* @param type the {@link Class} specified by the user.
* @return the {@link Class} to use for decoding.
*/
- private static Class> chooseClass(MySqlReadableMetadata metadata, Class> type) {
- Class> javaType = metadata.getType().getJavaType();
+ private static Class> chooseClass(final MySqlReadableMetadata metadata, Class> type,
+ final CodecContext codecContext) {
+ final Class> javaType = getDefaultJavaType(metadata, codecContext);
return type.isAssignableFrom(javaType) ? javaType : type;
}
+
+ private static boolean shouldBeTreatedAsBoolean(final @Nullable Integer precision, final MySqlType type,
+ final CodecContext context) {
+ if (precision == null || precision != 1) {
+ return false;
+ }
+ // ref: https://github.com/asyncer-io/r2dbc-mysql/issues/277
+ // BIT(1) should be treated as Boolean by default.
+ return type == MySqlType.BIT || type == MySqlType.TINYINT && context.isTinyInt1isBit();
+ }
+
+ private static Class> getDefaultJavaType(final MySqlReadableMetadata metadata, final CodecContext codecContext) {
+ final MySqlType type = metadata.getType();
+ final Integer precision = metadata.getPrecision();
+
+ if (shouldBeTreatedAsBoolean(precision, type, codecContext)) {
+ return Boolean.class;
+ }
+
+ return type.getJavaType();
+ }
+
static final class Builder implements CodecsBuilder {
@GuardedBy("lock")
diff --git a/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/ConnectionContextTest.java b/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/ConnectionContextTest.java
index 5d0635412..eb4a3ea3c 100644
--- a/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/ConnectionContextTest.java
+++ b/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/ConnectionContextTest.java
@@ -39,7 +39,7 @@ void getTimeZone() {
String id = i < 0 ? "UTC" + i : "UTC+" + i;
ConnectionContext context = new ConnectionContext(
ZeroDateOption.USE_NULL, null,
- 8192, true, ZoneId.of(id));
+ 8192, true, true, ZoneId.of(id));
assertThat(context.getTimeZone()).isEqualTo(ZoneId.of(id));
}
@@ -48,7 +48,7 @@ void getTimeZone() {
@Test
void setTwiceTimeZone() {
ConnectionContext context = new ConnectionContext(ZeroDateOption.USE_NULL, null,
- 8192, true, null);
+ 8192, true, true, null);
context.initSession(
Caches.createPrepareCache(0),
@@ -70,7 +70,7 @@ void setTwiceTimeZone() {
@Test
void badSetTimeZone() {
ConnectionContext context = new ConnectionContext(ZeroDateOption.USE_NULL, null,
- 8192, true, ZoneId.systemDefault());
+ 8192, true, true, ZoneId.systemDefault());
assertThatIllegalStateException().isThrownBy(() -> context.initSession(
Caches.createPrepareCache(0),
IsolationLevel.REPEATABLE_READ,
@@ -91,7 +91,7 @@ public static ConnectionContext mock(boolean isMariaDB) {
public static ConnectionContext mock(boolean isMariaDB, ZoneId zoneId) {
ConnectionContext context = new ConnectionContext(ZeroDateOption.USE_NULL, null,
- 8192, true, zoneId);
+ 8192, true, true, zoneId);
context.initHandshake(1, ServerVersion.parse(isMariaDB ? "11.2.22.MOCKED" : "8.0.11.MOCKED"),
Capability.of(~(isMariaDB ? 1 : 0)));
diff --git a/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java b/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java
index b65b3b447..4e4fa34ae 100644
--- a/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java
+++ b/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java
@@ -579,6 +579,44 @@ void loadDataLocalInfile(String name) throws URISyntaxException, IOException {
.doOnNext(it -> assertThat(it).isEqualTo(json)));
}
+ @Test
+ public void tinyInt1isBitTrueTestValue1() {
+ complete(connection -> Mono.from(connection.createStatement("CREATE TEMPORARY TABLE `test` (`id` INT NOT NULL PRIMARY KEY, `value` TINYINT(1))").execute())
+ .flatMap(IntegrationTestSupport::extractRowsUpdated)
+ .thenMany(connection.createStatement("INSERT INTO `test` VALUES (1, 1)").execute())
+ .flatMap(IntegrationTestSupport::extractRowsUpdated)
+ .thenMany(connection.createStatement("SELECT `value` FROM `test`").execute())
+ .flatMap(result -> result.map((row, metadata) -> row.get("value", Object.class)))
+ .doOnNext(value -> assertThat(value).isInstanceOf(Boolean.class))
+ .doOnNext(value -> assertThat(value).isEqualTo(true))
+ );
+ }
+
+ @Test
+ public void tinyInt1isBitTrueTestUnsignedTinyInt1isNotBoolean() {
+ complete(connection -> Mono.from(connection.createStatement("CREATE TEMPORARY TABLE `test` (`id` INT NOT NULL PRIMARY KEY, `value` TINYINT(1) UNSIGNED)").execute())
+ .flatMap(IntegrationTestSupport::extractRowsUpdated)
+ .thenMany(connection.createStatement("INSERT INTO `test` VALUES (1, 1)").execute())
+ .flatMap(IntegrationTestSupport::extractRowsUpdated)
+ .thenMany(connection.createStatement("SELECT `value` FROM `test`").execute())
+ .flatMap(result -> result.map((row, metadata) -> row.get("value", Object.class)))
+ .doOnNext(value -> assertThat(value).isInstanceOf(Short.class))
+ .doOnNext(value -> assertThat(value).isEqualTo(Short.valueOf((short)1)))
+ );
+ }
+
+ @Test
+ public void tinyInt1isBitTrueTestValue0() {
+ complete(connection -> Mono.from(connection.createStatement("CREATE TEMPORARY TABLE `test` (`id` INT NOT NULL PRIMARY KEY, `value` TINYINT(1))").execute())
+ .flatMap(IntegrationTestSupport::extractRowsUpdated)
+ .thenMany(connection.createStatement("INSERT INTO `test` VALUES (1, 0)").execute())
+ .flatMap(IntegrationTestSupport::extractRowsUpdated)
+ .thenMany(connection.createStatement("SELECT `value` FROM `test`").execute())
+ .flatMap(result -> result.map((row, metadata) -> row.get("value", Object.class)))
+ .doOnNext(value -> assertThat(value).isInstanceOf(Boolean.class))
+ .doOnNext(value -> assertThat(value).isEqualTo(false)));
+ }
+
@Test
void batchCrud() {
// TODO: spilt it to multiple test cases and move it to BatchIntegrationTest
diff --git a/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfigurationTest.java b/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfigurationTest.java
index f050f4e4a..e62fea190 100644
--- a/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfigurationTest.java
+++ b/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfigurationTest.java
@@ -22,6 +22,8 @@
import io.asyncer.r2dbc.mysql.constant.ZeroDateOption;
import io.asyncer.r2dbc.mysql.extension.Extension;
import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.resolver.AddressResolverGroup;
+import io.netty.resolver.DefaultAddressResolverGroup;
import org.assertj.core.api.ObjectAssert;
import org.assertj.core.api.ThrowableTypeAssert;
import org.jetbrains.annotations.Nullable;
@@ -207,6 +209,32 @@ void validPasswordSupplier() {
.verifyComplete();
}
+ @Test
+ void validResolver() {
+ final AddressResolverGroup> resolver = DefaultAddressResolverGroup.INSTANCE;
+ AddressResolverGroup> resolverGroup = MySqlConnectionConfiguration.builder()
+ .host(HOST)
+ .user(USER)
+ .resolver(resolver)
+ .autodetectExtensions(false)
+ .build()
+ .getResolver();
+ assertThat(resolverGroup).isSameAs(resolver);
+ }
+
+ @Test
+ void invalidMetrics() {
+ // throw exception when metrics true without micrometer-core dependency
+ assertThatIllegalArgumentException().isThrownBy(() ->
+ MySqlConnectionConfiguration
+ .builder()
+ .host(HOST)
+ .user(USER)
+ .metrics(true)
+ .build()
+ );
+ }
+
private static MySqlConnectionConfiguration unixSocketSslMode(SslMode sslMode) {
return MySqlConnectionConfiguration.builder()
.unixSocket(UNIX_SOCKET)
diff --git a/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProviderTest.java b/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProviderTest.java
index ab75161c1..be48a2255 100644
--- a/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProviderTest.java
+++ b/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProviderTest.java
@@ -20,6 +20,8 @@
import io.asyncer.r2dbc.mysql.constant.SslMode;
import io.asyncer.r2dbc.mysql.constant.ZeroDateOption;
import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.resolver.AddressResolverGroup;
+import io.netty.resolver.DefaultAddressResolverGroup;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactoryOptions;
import io.r2dbc.spi.Option;
@@ -49,7 +51,9 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static io.asyncer.r2dbc.mysql.MySqlConnectionFactoryProvider.METRICS;
import static io.asyncer.r2dbc.mysql.MySqlConnectionFactoryProvider.PASSWORD_PUBLISHER;
+import static io.asyncer.r2dbc.mysql.MySqlConnectionFactoryProvider.RESOLVER;
import static io.asyncer.r2dbc.mysql.MySqlConnectionFactoryProvider.USE_SERVER_PREPARE_STATEMENT;
import static io.r2dbc.spi.ConnectionFactoryOptions.CONNECT_TIMEOUT;
import static io.r2dbc.spi.ConnectionFactoryOptions.DATABASE;
@@ -453,6 +457,31 @@ void validPasswordSupplier() {
assertThat(ConnectionFactories.get(options)).isExactlyInstanceOf(MySqlConnectionFactory.class);
}
+ @Test
+ void validResolver() {
+ final AddressResolverGroup> resolver = DefaultAddressResolverGroup.INSTANCE;
+ ConnectionFactoryOptions options = ConnectionFactoryOptions.builder()
+ .option(DRIVER, "mysql")
+ .option(HOST, "127.0.0.1")
+ .option(USER, "root")
+ .option(RESOLVER, resolver)
+ .build();
+
+ assertThat(ConnectionFactories.get(options)).isExactlyInstanceOf(MySqlConnectionFactory.class);
+ }
+
+ @Test
+ void invalidMetrics() {
+ // throw exception when metrics true without micrometer-core dependency
+ assertThatIllegalArgumentException().isThrownBy(() ->
+ ConnectionFactories.get(ConnectionFactoryOptions.builder()
+ .option(DRIVER, "mysql")
+ .option(HOST, "127.0.0.1")
+ .option(USER, "root")
+ .option(METRICS, true)
+ .build()));
+ }
+
@Test
void allConfigurationOptions() {
List exceptConfigs = Arrays.asList(
diff --git a/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/TinyInt1isBitFalseTest.java b/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/TinyInt1isBitFalseTest.java
new file mode 100644
index 000000000..9db19e414
--- /dev/null
+++ b/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/TinyInt1isBitFalseTest.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2025 asyncer.io projects
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.asyncer.r2dbc.mysql;
+
+
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class TinyInt1isBitFalseTest extends IntegrationTestSupport{
+ TinyInt1isBitFalseTest() {
+ super(configuration(builder -> builder.tinyInt1isBit(false)));
+ }
+
+ @Test
+ public void tinyInt1isBitFalse() {
+ complete(connection -> Mono.from(connection.createStatement("CREATE TEMPORARY TABLE `test` (`id` INT NOT NULL PRIMARY KEY, `value` TINYINT(1))").execute())
+ .flatMap(IntegrationTestSupport::extractRowsUpdated)
+ .thenMany(connection.createStatement("INSERT INTO `test` VALUES (1, 1)").execute())
+ .flatMap(IntegrationTestSupport::extractRowsUpdated)
+ .thenMany(connection.createStatement("SELECT `value` FROM `test`").execute())
+ .flatMap(result -> result.map((row, metadata) -> row.get("value", Object.class)))
+ .doOnNext(value -> assertThat(value).isInstanceOf(Byte.class)));
+ }
+
+}
diff --git a/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/codec/BooleanCodecTest.java b/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/codec/BooleanCodecTest.java
index 999111de5..dbfd5c104 100644
--- a/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/codec/BooleanCodecTest.java
+++ b/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/codec/BooleanCodecTest.java
@@ -16,12 +16,22 @@
package io.asyncer.r2dbc.mysql.codec;
+import io.asyncer.r2dbc.mysql.ConnectionContextTest;
+import io.asyncer.r2dbc.mysql.constant.MySqlType;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import io.r2dbc.spi.R2dbcNonTransientException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.nio.charset.Charset;
import java.util.Arrays;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+
/**
* Unit tests for {@link BooleanCodec}.
*/
@@ -55,4 +65,109 @@ public ByteBuf[] binaryParameters(Charset charset) {
public ByteBuf sized(ByteBuf value) {
return value;
}
+
+ @Test
+ void decodeString() {
+ Codec codec = getCodec();
+ Charset c = ConnectionContextTest.mock().getClientCollation().getCharset();
+ byte[] bOne = new byte[]{(byte)1};
+ byte[] bZero = new byte[]{(byte)0};
+ ByteBuffer bitValOne = ByteBuffer.wrap(bOne);
+ ByteBuffer bitValZero = ByteBuffer.wrap(bZero);
+ Decoding d1 = new Decoding(Unpooled.copiedBuffer("true", c), "true", MySqlType.VARCHAR);
+ Decoding d2 = new Decoding(Unpooled.copiedBuffer("false", c), "false", MySqlType.VARCHAR);
+ Decoding d3 = new Decoding(Unpooled.copiedBuffer("1", c), "1", MySqlType.VARCHAR);
+ Decoding d4 = new Decoding(Unpooled.copiedBuffer("0", c), "0", MySqlType.VARCHAR);
+ Decoding d5 = new Decoding(Unpooled.copiedBuffer("Y", c), "Y", MySqlType.VARCHAR);
+ Decoding d6 = new Decoding(Unpooled.copiedBuffer("no", c), "no", MySqlType.VARCHAR);
+ Decoding d7 = new Decoding(Unpooled.copiedBuffer("26.57", c), "26.57", MySqlType.VARCHAR);
+ Decoding d8 = new Decoding(Unpooled.copiedBuffer("-57", c), "=57", MySqlType.VARCHAR);
+ Decoding d9 = new Decoding(Unpooled.copiedBuffer("100000", c), "100000", MySqlType.VARCHAR);
+ Decoding d10 = new Decoding(Unpooled.copiedBuffer("-12345678901234567890", c),
+ "-12345678901234567890", MySqlType.VARCHAR);
+ Decoding d11 = new Decoding(Unpooled.copiedBuffer("Banana", c), "Banana", MySqlType.VARCHAR);
+ Decoding d12 = new Decoding(Unpooled.copiedBuffer(bitValOne), bitValOne, MySqlType.BIT);
+ Decoding d13 = new Decoding(Unpooled.copiedBuffer(bitValZero), bitValZero, MySqlType.BIT);
+ Decoding d14 = new Decoding(Unpooled.copyDouble(26.57d), 26.57d, MySqlType.DOUBLE);
+ Decoding d15 = new Decoding(Unpooled.copiedBuffer(bOne), bOne, MySqlType.TINYINT);
+ Decoding d16 = new Decoding(Unpooled.copiedBuffer(bZero), bZero, MySqlType.TINYINT);
+ Decoding d17 = new Decoding(Unpooled.copiedBuffer("1e4", c), "1e4", MySqlType.VARCHAR);
+ Decoding d18 = new Decoding(Unpooled.copiedBuffer("-1.34e10", c), "-1.34e10", MySqlType.VARCHAR);
+ Decoding d19 = new Decoding(Unpooled.copiedBuffer("-0", c), "-0", MySqlType.VARCHAR);
+
+ assertThat(codec.decode(d1.content(), d1.metadata(), Boolean.class, false, ConnectionContextTest.mock()))
+ .as("Decode failed, %s", d1)
+ .isEqualTo(true);
+
+ assertThat(codec.decode(d2.content(), d2.metadata(), Boolean.class, false, ConnectionContextTest.mock()))
+ .as("Decode failed, %s", d2)
+ .isEqualTo(false);
+
+ assertThat(codec.decode(d3.content(), d3.metadata(), Boolean.class, false, ConnectionContextTest.mock()))
+ .as("Decode failed, %s", d3)
+ .isEqualTo(true);
+
+ assertThat(codec.decode(d4.content(), d4.metadata(), Boolean.class, false, ConnectionContextTest.mock()))
+ .as("Decode failed, %s", d4)
+ .isEqualTo(false);
+
+ assertThat(codec.decode(d5.content(), d5.metadata(), Boolean.class, false, ConnectionContextTest.mock()))
+ .as("Decode failed, %s", d5)
+ .isEqualTo(true);
+
+ assertThat(codec.decode(d6.content(), d6.metadata(), Boolean.class, false, ConnectionContextTest.mock()))
+ .as("Decode failed, %s", d6)
+ .isEqualTo(false);
+
+ assertThat(codec.decode(d7.content(), d7.metadata(), Boolean.class, false, ConnectionContextTest.mock()))
+ .as("Decode failed, %s", d7)
+ .isEqualTo(true);
+
+ assertThat(codec.decode(d8.content(), d8.metadata(), Boolean.class, false, ConnectionContextTest.mock()))
+ .as("Decode failed, %s", d8)
+ .isEqualTo(false);
+
+ assertThat(codec.decode(d9.content(), d9.metadata(), Boolean.class, false, ConnectionContextTest.mock()))
+ .as("Decode failed, %s", d9)
+ .isEqualTo(true);
+
+ assertThat(codec.decode(d10.content(), d10.metadata(), Boolean.class, false, ConnectionContextTest.mock()))
+ .as("Decode failed, %s", d10)
+ .isEqualTo(false);
+
+ assertThatThrownBy(() -> {codec.decode(d11.content(), d11.metadata(), Boolean.class, false, ConnectionContextTest.mock());})
+ .isInstanceOf(R2dbcNonTransientException.class);
+
+ assertThat(codec.decode(d12.content(), d12.metadata(), Boolean.class, false, ConnectionContextTest.mock()))
+ .as("Decode failed, %s", d12)
+ .isEqualTo(true);
+
+ assertThat(codec.decode(d13.content(), d13.metadata(), Boolean.class, false, ConnectionContextTest.mock()))
+ .as("Decode failed, %s", d13)
+ .isEqualTo(false);
+
+ assertThat(codec.decode(d14.content(), d14.metadata(), Boolean.class, false, ConnectionContextTest.mock()))
+ .as("Decode failed, %s", d14)
+ .isEqualTo(true);
+
+ assertThat(codec.decode(d15.content(), d15.metadata(), Boolean.class, true, ConnectionContextTest.mock()))
+ .as("Decode failed, %s", d15)
+ .isEqualTo(true);
+
+ assertThat(codec.decode(d16.content(), d16.metadata(), Boolean.class, true, ConnectionContextTest.mock()))
+ .as("Decode failed, %s", d16)
+ .isEqualTo(false);
+
+ assertThat(codec.decode(d17.content(), d17.metadata(), Boolean.class, false, ConnectionContextTest.mock()))
+ .as("Decode failed, %s", d17)
+ .isEqualTo(true);
+
+ assertThat(codec.decode(d18.content(), d18.metadata(), Boolean.class, false, ConnectionContextTest.mock()))
+ .as("Decode failed, %s", d18)
+ .isEqualTo(false);
+
+ assertThat(codec.decode(d19.content(), d19.metadata(), Boolean.class, false, ConnectionContextTest.mock()))
+ .as("Decode failed, %s", d19)
+ .isEqualTo(false);
+ }
}
diff --git a/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/codec/ByteArrayInputStreamCodecTest.java b/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/codec/ByteArrayInputStreamCodecTest.java
new file mode 100644
index 000000000..8c7e8b847
--- /dev/null
+++ b/r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/codec/ByteArrayInputStreamCodecTest.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2025 asyncer.io projects
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.asyncer.r2dbc.mysql.codec;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.testcontainers.shaded.org.bouncycastle.util.encoders.Hex;
+
+import java.io.ByteArrayInputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+/**
+ * Unit tests for {@link ByteArrayInputStreamCodec}.
+ */
+public class ByteArrayInputStreamCodecTest implements CodecTestSupport {
+
+ private final byte[][] rawData = {
+ new byte[0],
+ new byte[] { 0x7F },
+ new byte[] { 0x12, 34, 0x56, 78, (byte) 0x9A },
+ "Hello world!".getBytes(StandardCharsets.US_ASCII),
+ new byte[] { (byte) 0xFE, (byte) 0xDC, (byte) 0xBA },
+ };
+
+ private final ByteArrayInputStream[] data = Arrays.stream(rawData)
+ .map(ByteArrayInputStream::new)
+ .toArray(ByteArrayInputStream[]::new);
+
+ @Override
+ public Codec getCodec() {
+ return ByteArrayInputStreamCodec.INSTANCE;
+ }
+
+ @Override
+ public ByteArrayInputStream[] originParameters() {
+ return data;
+ }
+
+ @Override
+ public Object[] stringifyParameters() {
+ return Arrays.stream(rawData)
+ .map(bytes -> String.format("x'%s'", Hex.toHexString(bytes)))
+ .toArray();
+ }
+
+ @Override
+ public ByteBuf[] binaryParameters(Charset charset) {
+ return Arrays.stream(rawData)
+ .map(Unpooled::wrappedBuffer)
+ .toArray(ByteBuf[]::new);
+ }
+}
diff --git a/test-native-image/pom.xml b/test-native-image/pom.xml
index e18ad93bb..36b13f6b0 100644
--- a/test-native-image/pom.xml
+++ b/test-native-image/pom.xml
@@ -5,7 +5,7 @@
4.0.0
io.asyncer
test-native-image
- 1.1.4-SNAPSHOT
+ 1.4.2-SNAPSHOT
UTF-8
@@ -15,9 +15,9 @@
8
true
- 2022.0.16
+ 2024.0.3
1.0.0.RELEASE
- 20.3.13
+ 20.3.17
@@ -67,7 +67,12 @@
${skipNativeImage}
${project.artifactId}
io.asyncer.Main
- --report-unsupported-elements-at-runtime --allow-incomplete-classpath --initialize-at-run-time=io.netty.handler.ssl.BouncyCastleAlpnSslUtils
+
+ --report-unsupported-elements-at-runtime
+ --allow-incomplete-classpath
+ --initialize-at-run-time=io.netty.handler.ssl.BouncyCastleAlpnSslUtils
+ --initialize-at-run-time=io.netty.handler.ssl.JdkSslServerContext
+