Skip to content

Commit 3f13722

Browse files
committed
new postgresql-async version
prepared statements are now possible in a transaction new tests and test fixes
1 parent 2f34968 commit 3f13722

File tree

5 files changed

+82
-49
lines changed

5 files changed

+82
-49
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ modname=mod-mysql-postgresql
88
version=0.2.0-SNAPSHOT
99

1010
# The version of mauricios async driver
11-
asyncDriverVersion=0.2.8
11+
asyncDriverVersion=0.2.9
1212

1313
# The test timeout in seconds
1414
testtimeout=5

src/main/scala/io/vertx/asyncsql/database/ConnectionHandler.scala

Lines changed: 36 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ trait ConnectionHandler extends ScalaBusMod with VertxScalaHelpers {
2828
override def asyncReceive(msg: Message[JsonObject]) = {
2929
case "select" => select(msg.body)
3030
case "insert" => insert(msg.body)
31-
case "prepared" => prepared(msg.body)
31+
case "prepared" => sendWithPool(prepared(msg.body))
3232
case "transaction" => transaction(msg.body)
33-
case "raw" => rawCommand(msg.body.getString("command"))
33+
case "raw" => sendWithPool(rawCommand(msg.body.getString("command")))
3434
}
3535

3636
def close() = pool.close
@@ -53,7 +53,7 @@ trait ConnectionHandler extends ScalaBusMod with VertxScalaHelpers {
5353
}
5454

5555
protected def select(json: JsonObject): Future[Reply] = pool.withConnection({ c: Connection =>
56-
rawCommand(selectCommand(json))
56+
sendWithPool(rawCommand(selectCommand(json)))
5757
})
5858

5959
protected def insertCommand(json: JsonObject): String = {
@@ -74,43 +74,50 @@ trait ConnectionHandler extends ScalaBusMod with VertxScalaHelpers {
7474
}
7575

7676
protected def insert(json: JsonObject): Future[Reply] = {
77-
rawCommand(insertCommand(json))
77+
sendWithPool(rawCommand(insertCommand(json)))
7878
}
7979

80+
sealed trait CommandType { val query: Connection => Future[QueryResult] }
81+
case class Raw(stmt: String) extends CommandType { val query = rawCommand(stmt) }
82+
case class Prepared(json: JsonObject) extends CommandType { val query = prepared(json) }
83+
8084
protected def transaction(json: JsonObject): Future[Reply] = pool.withConnection({ c: Connection =>
8185
logger.info("TRANSACTION-JSON: " + json.encodePrettily())
86+
8287
Option(json.getArray("statements")) match {
83-
case Some(statements) => rawCommand((statements.asScala.map {
84-
case js: JsonObject =>
85-
js.getString("action") match {
86-
case "select" => selectCommand(js)
87-
case "insert" => insertCommand(js)
88-
case "raw" => js.getString("command")
89-
}
90-
case _ => throw new IllegalArgumentException("'statements' needs JsonObjects!")
91-
}).mkString(transactionStart, statementDelimiter, statementDelimiter + transactionEnd))
88+
case Some(statements) => c.inTransaction { conn: Connection =>
89+
val futures = (statements.asScala.map {
90+
case js: JsonObject =>
91+
js.getString("action") match {
92+
case "select" => Raw(selectCommand(js))
93+
case "insert" => Raw(insertCommand(js))
94+
case "prepared" => Prepared(js)
95+
case "raw" => Raw(js.getString("command"))
96+
}
97+
case _ => throw new IllegalArgumentException("'statements' needs JsonObjects!")
98+
})
99+
val f = (futures.foldLeft(Future[Any]()) { case (f, cmd) => f flatMap (_ => cmd.query(conn)) })
100+
f map (_ => Ok(Json.obj()))
101+
}
102+
//.mkString(transactionStart, statementDelimiter, statementDelimiter + transactionEnd))
92103
case None => throw new IllegalArgumentException("No 'statements' field in request!")
93104
}
94105
})
95106

96-
protected def prepared(json: JsonObject): Future[Reply] = pool.withConnection({ c: Connection =>
97-
c.sendPreparedStatement(json.getString("statement"), json.getArray("values").toArray()) map buildResults recover {
107+
protected def sendWithPool(fn: Connection => Future[QueryResult]): Future[Reply] = pool.withConnection({ c: Connection =>
108+
fn(c) map buildResults recover {
98109
case x: GenericDatabaseException =>
99110
Error(x.errorMessage.message)
100111
case x =>
101112
Error(x.getMessage())
102113
}
103114
})
104115

105-
protected def rawCommand(command: String): Future[Reply] = pool.withConnection({ c: Connection =>
106-
logger.info("sending command: " + command)
107-
c.sendQuery(command) map buildResults recover {
108-
case x: GenericDatabaseException =>
109-
Error(x.errorMessage.message)
110-
case x =>
111-
Error(x.getMessage())
112-
}
113-
})
116+
protected def prepared(json: JsonObject): Connection => Future[QueryResult] = { c: Connection =>
117+
c.sendPreparedStatement(json.getString("statement"), json.getArray("values").toArray())
118+
}
119+
120+
protected def rawCommand(command: String): Connection => Future[QueryResult] = { c: Connection => c.sendQuery(command) }
114121

115122
private def buildResults(qr: QueryResult): Reply = {
116123
val result = new JsonObject()
@@ -123,13 +130,9 @@ trait ConnectionHandler extends ScalaBusMod with VertxScalaHelpers {
123130
arr.addString(name)
124131
}
125132

126-
val rows = Json.arr((for {
127-
rowData <- resultSet
128-
} yield Json.arr((for {
129-
columnName <- resultSet.columnNames
130-
} yield {
131-
rowData(columnName)
132-
}): _*)): _*)
133+
val rows = (new JsonArray() /: resultSet) { (arr, rowData) =>
134+
arr.add(rowDataToJsonArray(rowData))
135+
}
133136

134137
result.putArray("fields", fields)
135138
result.putArray("results", rows)
@@ -138,4 +141,6 @@ trait ConnectionHandler extends ScalaBusMod with VertxScalaHelpers {
138141

139142
Ok(result)
140143
}
144+
145+
private def rowDataToJsonArray(rowData: RowData): JsonArray = Json.arr(rowData.toList: _*)
141146
}

src/test/scala/io/vertx/asyncsql/test/BaseSqlTests.scala

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ trait BaseSqlTests { this: SqlTestVerticle =>
1414
sth <- fn
1515
_ <- dropTable(tableName)
1616
} yield sth) recoverWith {
17-
case _ =>
18-
dropTable(tableName)
17+
case x =>
18+
dropTable(tableName) map (_ => throw x)
1919
}
2020
}
2121

@@ -60,9 +60,13 @@ trait BaseSqlTests { this: SqlTestVerticle =>
6060

6161
assertEquals("Mr. Test", results.get(columnNamesList.indexOf("name")))
6262
assertEquals("test@example.com", results.get(columnNamesList.indexOf("email")))
63-
assertEquals(15, results.get(columnNamesList.indexOf("age")))
64-
assertEquals(true, results.get(columnNamesList.indexOf("is_male")))
65-
assertEquals(167.31, results.get(columnNamesList.indexOf("money")), 0.1)
63+
assertEquals(15, results.get[Int](columnNamesList.indexOf("age")))
64+
assertTrue(results.get[Any](columnNamesList.indexOf("is_male")) match {
65+
case b: Boolean => b
66+
case i: Int => i == 1
67+
case x => false
68+
})
69+
assertEquals(167.31, results.get(columnNamesList.indexOf("money")), 0.1)
6670
}
6771
}
6872

@@ -147,7 +151,11 @@ trait BaseSqlTests { this: SqlTestVerticle =>
147151
private def checkMrTest(mrTest: JsonArray) = {
148152
assertEquals("Mr. Test", mrTest.get[String](0))
149153
assertEquals("test@example.com", mrTest.get[String](1))
150-
assertTrue(mrTest.get[Boolean](2) == true || mrTest.get[Integer](2) == 1)
154+
assertTrue(mrTest.get[Any](2) match {
155+
case b: Boolean => b
156+
case i: Int => i == 1
157+
case x => false
158+
})
151159
assertEquals(15, mrTest.get[Integer](3))
152160
assertEquals(167.31, mrTest.get[Integer](4))
153161
// FIXME check date conversion
@@ -194,15 +202,32 @@ trait BaseSqlTests { this: SqlTestVerticle =>
194202
}
195203

196204
def transaction(): Unit = typeTestInsert {
197-
expectOk(
198-
transaction(
199-
insert("some_test", Json.arr("name", "email", "is_male", "age", "money"),
200-
Json.arr(Json.arr("Mr. Test jr.", "test3@example.com", true, 5, 2))),
201-
raw("SELECT SUM(age) FROM some_test WHERE is_male = true")))
202-
.map { reply =>
203-
val results = reply.getArray("results")
204-
assertEquals(1, results.size())
205-
assertEquals(20, results.get[JsonArray](0).get[Int](0))
206-
}
205+
(for {
206+
a <- expectOk(
207+
transaction(
208+
insert("some_test", Json.arr("name", "email", "is_male", "age", "money"),
209+
Json.arr(Json.arr("Mr. Test jr.", "test3@example.com", true, 5, 2))),
210+
raw("UPDATE some_test SET age=6 WHERE name = 'Mr. Test jr.'")))
211+
b <- expectOk(raw("SELECT SUM(age) FROM some_test WHERE is_male = true"))
212+
} yield b) map { reply =>
213+
val results = reply.getArray("results")
214+
assertEquals(1, results.size())
215+
assertEquals(21, results.get[JsonArray](0).get[Int](0))
216+
}
217+
}
218+
219+
def transactionWithPreparedStatement(): Unit = typeTestInsert {
220+
(for {
221+
a <- expectOk(
222+
transaction(
223+
insert("some_test", Json.arr("name", "email", "is_male", "age", "money"),
224+
Json.arr(Json.arr("Mr. Test jr.", "test3@example.com", true, 5, 2))),
225+
prepared("UPDATE some_test SET age=? WHERE name=?", Json.arr(6, "Mr. Test jr."))))
226+
b <- expectOk(raw("SELECT SUM(age) FROM some_test WHERE is_male = true"))
227+
} yield b) map { reply =>
228+
val results = reply.getArray("results")
229+
assertEquals(1, results.size())
230+
assertEquals(21, results.get[JsonArray](0).get[Int](0))
231+
}
207232
}
208233
}

src/test/scala/io/vertx/asyncsql/test/mysql/MySqlTest.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ CREATE TABLE """ + tableName + """ (
5050
override def selectFiltered(): Unit = super.selectFiltered()
5151
@Test
5252
override def preparedSelect(): Unit = super.preparedSelect()
53-
53+
@Test
54+
override def transactionWithPreparedStatement(): Unit = super.transactionWithPreparedStatement()
5455
// @Ignore("not working currently")
5556
// @Test
5657
// override def transaction(): Unit = super.transaction()

src/test/scala/io/vertx/asyncsql/test/postgresql/PostgreSqlTest.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,5 +38,7 @@ class PostgreSqlTest extends SqlTestVerticle with BaseSqlTests {
3838
override def preparedSelect(): Unit = super.preparedSelect()
3939
@Test
4040
override def transaction(): Unit = super.transaction()
41+
@Test
42+
override def transactionWithPreparedStatement(): Unit = super.transactionWithPreparedStatement()
4143

4244
}

0 commit comments

Comments
 (0)