Skip to content

Commit 19d3b21

Browse files
committed
trying to use netty EventLoop
1 parent 61c5016 commit 19d3b21

File tree

9 files changed

+35
-18
lines changed

9 files changed

+35
-18
lines changed

build.gradle

+2-2
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,10 @@ dependencies {
6161
provided "org.scala-lang:scala-compiler:$scalaVersion"
6262
provided "io.vertx:lang-scala:$scalaLangModVersion"
6363

64-
compile("com.github.mauricio:postgresql-async_2.10:0.2.4") {
64+
compile("com.github.mauricio:postgresql-async_2.10:$asyncDriverVersion") {
6565
exclude group: 'org.scala-lang'
6666
}
67-
compile("com.github.mauricio:mysql-async_2.10:0.2.4") {
67+
compile("com.github.mauricio:mysql-async_2.10:$asyncDriverVersion") {
6868
exclude group: 'org.scala-lang'
6969
}
7070
}

gradle.properties

+3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ modname=async-mysql-postgresql
77
# Your module version
88
version=0.1.0-SNAPSHOT
99

10+
# The version of mauricios async driver
11+
asyncDriverVersion=0.2.5-SNAPSHOT
12+
1013
# The test timeout in seconds
1114
testtimeout=5
1215

src/main/resources/langs.properties

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
scala=io.vertx~lang-scala~2.0.0-SNAPSHOT:org.vertx.scala.platform.impl.ScalaVerticleFactory
2+
.scala=scala

src/main/scala/com/campudus/vertx/busmod/ScalaBusMod.scala

+3-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ trait ScalaBusMod extends MessageHelper with VertxExecutionContext with (Message
2424

2525
fut map { reply => msg.reply(reply.toJson) } recover {
2626
// case x: BusException => msg.reply(new JsonObject().putString("status", "error").putString("message", x.getMessage()).putString("id", x.getId()))
27-
case x => msg.reply(new JsonObject().putString("status", "error").putString("message", x.getMessage()))
27+
case x =>
28+
x.printStackTrace(System.err)
29+
msg.reply(new JsonObject().putString("status", "error").putString("message", x.getMessage()))
2830
}
2931
}
3032

src/main/scala/com/campudus/vertx/database/ConnectionHandler.scala

+3-5
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,11 @@ import com.github.mauricio.async.db.Connection
99
import com.campudus.vertx.busmod.ScalaBusMod
1010
import scala.concurrent.Future
1111
import com.campudus.vertx.database.pool.AsyncConnectionPool
12+
import org.vertx.scala.core.Vertx
1213

13-
class ConnectionHandler(dbType: String, config: Configuration) extends ScalaBusMod {
14+
class ConnectionHandler(vertx: Vertx, dbType: String, config: Configuration) extends ScalaBusMod {
1415

15-
val pool = dbType match {
16-
case "postgresql" => new PostgreSQLAsyncConnectionPool(config)
17-
case _ => ???
18-
}
16+
val pool = AsyncConnectionPool(vertx, dbType, config)
1917

2018
override def receive(msg: Message[JsonObject]) = {
2119
case "query" => pool.withConnection({ c: Connection =>

src/main/scala/com/campudus/vertx/database/Starter.scala

+10-6
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ class Starter extends Verticle {
1616

1717
var connection: String = null
1818
var configuration: Configuration = null
19-
var pool: ConnectionHandler = null
19+
var handler: ConnectionHandler = null
2020

2121
override def start(startedResult: org.vertx.scala.core.Future[Void]) = {
2222

@@ -37,14 +37,14 @@ class Starter extends Verticle {
3737
val port = config.getInteger("port", defaultPortFor(connection))
3838
val username = config.getString("username", defaultUserFor(connection))
3939
val password = Option(config.getString("password")).orElse(defaultPasswordFor(connection))
40-
val database = Option(config.getString("database"))
40+
val database = Option(config.getString("database")).orElse(defaultDatabaseFor(connection))
4141

4242
configuration = Configuration(username, host, port, password, database)
4343

44-
pool = new ConnectionHandler(connection, configuration)
45-
vertx.eventBus.registerHandler(address)(pool)
44+
handler = new ConnectionHandler(vertx, connection, configuration)
45+
vertx.eventBus.registerHandler(address)(handler)
4646

47-
logger.error("Async database module for MySQL and PostgreSQL started.")
47+
logger.error("Async database module for MySQL and PostgreSQL started with config " + configuration)
4848

4949
startedResult.setResult(null)
5050
} catch {
@@ -55,14 +55,18 @@ class Starter extends Verticle {
5555
}
5656

5757
override def stop() {
58-
Option(pool).map(_.close)
58+
Option(handler).map(_.close)
5959
}
6060

6161
private def defaultPortFor(connection: String): Integer = connection match {
6262
case "postgresql" => 5432
6363
case "mysql" => 3306
6464
}
6565

66+
private def defaultDatabaseFor(connection: String): Option[String] = connection match {
67+
case _ => Some("testdb")
68+
}
69+
6670
private def defaultUserFor(connection: String): String = connection match {
6771
case "postgresql" => "vertx"
6872
case "mysql" => "root"

src/main/scala/com/campudus/vertx/database/pool/AsyncConnectionPool.scala

+7-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import scala.concurrent.ExecutionContext
77
import scala.concurrent.Promise
88
import scala.util.Failure
99
import scala.util.Success
10+
import org.vertx.scala.core.Vertx
11+
import org.vertx.java.core.impl.EventLoopContext
1012

1113
trait AsyncConnectionPool[ConnType <: Connection] {
1214

@@ -36,8 +38,11 @@ trait AsyncConnectionPool[ConnType <: Connection] {
3638

3739
object AsyncConnectionPool {
3840

39-
def apply(dbType: String, config: Configuration) = dbType match {
40-
case "postgresql" => new PostgreSQLAsyncConnectionPool(config)
41+
def apply(vertx: Vertx, dbType: String, config: Configuration) = dbType match {
42+
case "postgresql" =>
43+
new PostgreSQLAsyncConnectionPool(
44+
config,
45+
vertx.internal.currentContext().asInstanceOf[EventLoopContext].getEventLoop())
4146
case _ => throw new NotImplementedError
4247
}
4348

src/main/scala/com/campudus/vertx/database/pool/PostgreSQLAsyncConnectionPool.scala

+4-2
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@ import scala.concurrent.Future
66
import scala.concurrent.ExecutionContext
77
import com.campudus.vertx.VertxExecutionContext
88
import com.github.mauricio.async.db.Connection
9+
import org.vertx.java.core.impl.EventLoopContext
10+
import io.netty.channel.EventLoop
911

10-
class PostgreSQLAsyncConnectionPool(config: Configuration, implicit val executionContext: ExecutionContext = VertxExecutionContext) extends AsyncConnectionPool[PostgreSQLConnection] {
12+
class PostgreSQLAsyncConnectionPool(config: Configuration, eventLoop: EventLoop, implicit val executionContext: ExecutionContext = VertxExecutionContext) extends AsyncConnectionPool[PostgreSQLConnection] {
1113

12-
override def take() = new PostgreSQLConnection(config).connect
14+
override def take() = new PostgreSQLConnection(configuration = config, group = eventLoop).connect
1315

1416
override def giveBack(connection: Connection) = {
1517
connection.disconnect map (_ => this) recover {

src/test/scala/com/campudus/test/postgresql/PostgreSQLTest.scala

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ class PostgreSQLTest extends TestVerticle with VertxScalaHelpers {
3636
}
3737

3838
private def expectOk(q: JsonObject): Future[JsonObject] = ebSend(q) map { reply =>
39+
logger.error("got reply: " + reply.encode())
3940
assertEquals("ok", reply.getString("status"))
4041
reply
4142
}

0 commit comments

Comments
 (0)