Skip to content

Commit 27b2930

Browse files
committed
Merge pull request vert-x#12 from campudus/postgresql_async_0.2.9
Postgresql async 0.2.9
2 parents 2f34968 + f474025 commit 27b2930

File tree

7 files changed

+101
-73
lines changed

7 files changed

+101
-73
lines changed

README.md

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,6 @@ Creates a prepared statement and lets you fill the `?` with values.
104104

105105
Takes several statements and wraps them into a single transaction for the server to process. Use `statement : [...actions...]` to create such a transaction. Only `select`, `insert` and `raw` commands are allowed right now.
106106

107-
Be aware: This is only tested and working with PostgreSQL currently.
108-
109107
{
110108
"action" : "transaction",
111109
"statements" : [
@@ -117,12 +115,12 @@ Be aware: This is only tested and working with PostgreSQL currently.
117115
},
118116
{
119117
"action" : "raw",
120-
"command" : "UPDATE account SET balance=balance+? WHERE name=?",
118+
"command" : "UPDATE account SET balance=balance+1 WHERE name='Mr. Test'",
121119
},
122120
{
123-
"action" : "select",
124-
"table" : "account",
125-
"fields" : ["balance"]
121+
"action" : "prepared",
122+
"statement" : "UPDATE account SET balance=balance+? WHERE name=?",
123+
"values" : [25, 'Mr. Test']
126124
}
127125
]
128126
}

gradle.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ modname=mod-mysql-postgresql
88
version=0.2.0-SNAPSHOT
99

1010
# The version of mauricios async driver
11-
asyncDriverVersion=0.2.8
11+
asyncDriverVersion=0.2.9
1212

1313
# The test timeout in seconds
1414
testtimeout=5
@@ -29,7 +29,7 @@ scalaVersion=2.10.2
2929
gradleVersion=1.6
3030

3131
# The version of Vert.x
32-
vertxVersion=2.1M1
32+
vertxVersion=2.1M2
3333

3434
# The version of Vert.x test tools
3535
toolsVersion=2.0.2-final

src/main/scala/io/vertx/asyncsql/database/ConnectionHandler.scala

Lines changed: 35 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ trait ConnectionHandler extends ScalaBusMod with VertxScalaHelpers {
2828
override def asyncReceive(msg: Message[JsonObject]) = {
2929
case "select" => select(msg.body)
3030
case "insert" => insert(msg.body)
31-
case "prepared" => prepared(msg.body)
31+
case "prepared" => sendWithPool(prepared(msg.body))
3232
case "transaction" => transaction(msg.body)
33-
case "raw" => rawCommand(msg.body.getString("command"))
33+
case "raw" => sendWithPool(rawCommand(msg.body.getString("command")))
3434
}
3535

3636
def close() = pool.close
@@ -53,7 +53,7 @@ trait ConnectionHandler extends ScalaBusMod with VertxScalaHelpers {
5353
}
5454

5555
protected def select(json: JsonObject): Future[Reply] = pool.withConnection({ c: Connection =>
56-
rawCommand(selectCommand(json))
56+
sendWithPool(rawCommand(selectCommand(json)))
5757
})
5858

5959
protected def insertCommand(json: JsonObject): String = {
@@ -74,43 +74,49 @@ trait ConnectionHandler extends ScalaBusMod with VertxScalaHelpers {
7474
}
7575

7676
protected def insert(json: JsonObject): Future[Reply] = {
77-
rawCommand(insertCommand(json))
77+
sendWithPool(rawCommand(insertCommand(json)))
7878
}
7979

80+
sealed trait CommandType { val query: Connection => Future[QueryResult] }
81+
case class Raw(stmt: String) extends CommandType { val query = rawCommand(stmt) }
82+
case class Prepared(json: JsonObject) extends CommandType { val query = prepared(json) }
83+
8084
protected def transaction(json: JsonObject): Future[Reply] = pool.withConnection({ c: Connection =>
8185
logger.info("TRANSACTION-JSON: " + json.encodePrettily())
86+
8287
Option(json.getArray("statements")) match {
83-
case Some(statements) => rawCommand((statements.asScala.map {
84-
case js: JsonObject =>
85-
js.getString("action") match {
86-
case "select" => selectCommand(js)
87-
case "insert" => insertCommand(js)
88-
case "raw" => js.getString("command")
89-
}
90-
case _ => throw new IllegalArgumentException("'statements' needs JsonObjects!")
91-
}).mkString(transactionStart, statementDelimiter, statementDelimiter + transactionEnd))
88+
case Some(statements) => c.inTransaction { conn: Connection =>
89+
val futures = (statements.asScala.map {
90+
case js: JsonObject =>
91+
js.getString("action") match {
92+
case "select" => Raw(selectCommand(js))
93+
case "insert" => Raw(insertCommand(js))
94+
case "prepared" => Prepared(js)
95+
case "raw" => Raw(js.getString("command"))
96+
}
97+
case _ => throw new IllegalArgumentException("'statements' needs JsonObjects!")
98+
})
99+
val f = (futures.foldLeft(Future[Any]()) { case (f, cmd) => f flatMap (_ => cmd.query(conn)) })
100+
f map (_ => Ok(Json.obj()))
101+
}
92102
case None => throw new IllegalArgumentException("No 'statements' field in request!")
93103
}
94104
})
95105

96-
protected def prepared(json: JsonObject): Future[Reply] = pool.withConnection({ c: Connection =>
97-
c.sendPreparedStatement(json.getString("statement"), json.getArray("values").toArray()) map buildResults recover {
106+
protected def sendWithPool(fn: Connection => Future[QueryResult]): Future[Reply] = pool.withConnection({ c: Connection =>
107+
fn(c) map buildResults recover {
98108
case x: GenericDatabaseException =>
99109
Error(x.errorMessage.message)
100110
case x =>
101111
Error(x.getMessage())
102112
}
103113
})
104114

105-
protected def rawCommand(command: String): Future[Reply] = pool.withConnection({ c: Connection =>
106-
logger.info("sending command: " + command)
107-
c.sendQuery(command) map buildResults recover {
108-
case x: GenericDatabaseException =>
109-
Error(x.errorMessage.message)
110-
case x =>
111-
Error(x.getMessage())
112-
}
113-
})
115+
protected def prepared(json: JsonObject): Connection => Future[QueryResult] = { c: Connection =>
116+
c.sendPreparedStatement(json.getString("statement"), json.getArray("values").toArray())
117+
}
118+
119+
protected def rawCommand(command: String): Connection => Future[QueryResult] = { c: Connection => c.sendQuery(command) }
114120

115121
private def buildResults(qr: QueryResult): Reply = {
116122
val result = new JsonObject()
@@ -123,13 +129,9 @@ trait ConnectionHandler extends ScalaBusMod with VertxScalaHelpers {
123129
arr.addString(name)
124130
}
125131

126-
val rows = Json.arr((for {
127-
rowData <- resultSet
128-
} yield Json.arr((for {
129-
columnName <- resultSet.columnNames
130-
} yield {
131-
rowData(columnName)
132-
}): _*)): _*)
132+
val rows = (new JsonArray() /: resultSet) { (arr, rowData) =>
133+
arr.add(rowDataToJsonArray(rowData))
134+
}
133135

134136
result.putArray("fields", fields)
135137
result.putArray("results", rows)
@@ -138,4 +140,6 @@ trait ConnectionHandler extends ScalaBusMod with VertxScalaHelpers {
138140

139141
Ok(result)
140142
}
143+
144+
private def rowDataToJsonArray(rowData: RowData): JsonArray = Json.arr(rowData.toList: _*)
141145
}

src/test/scala/io/vertx/asyncsql/test/BaseSqlTests.scala

Lines changed: 52 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ trait BaseSqlTests { this: SqlTestVerticle =>
1414
sth <- fn
1515
_ <- dropTable(tableName)
1616
} yield sth) recoverWith {
17-
case _ =>
18-
dropTable(tableName)
17+
case x =>
18+
dropTable(tableName) map (_ => throw x)
1919
}
2020
}
2121

@@ -34,7 +34,7 @@ trait BaseSqlTests { this: SqlTestVerticle =>
3434
expectOk(raw("SELECT 0")) map { reply =>
3535
val res = reply.getArray("results")
3636
assertEquals(1, res.size())
37-
assertEquals(0, res.get[JsonArray](0).get[Int](0))
37+
assertEquals(0, res.get[JsonArray](0).get[Number](0).intValue())
3838
}
3939
}
4040

@@ -43,8 +43,8 @@ trait BaseSqlTests { this: SqlTestVerticle =>
4343
val res = reply.getArray("results")
4444
assertEquals(1, res.size())
4545
val firstElem = res.get[JsonArray](0)
46-
assertEquals(1, firstElem.get[Integer](0))
47-
assertEquals(0, firstElem.get[Integer](1))
46+
assertEquals(1, firstElem.get[Number](0).intValue())
47+
assertEquals(0, firstElem.get[Number](1).intValue())
4848
}
4949
}
5050

@@ -54,21 +54,25 @@ trait BaseSqlTests { this: SqlTestVerticle =>
5454
val receivedFields = reply.getArray("fields")
5555
val results = reply.getArray("results").get[JsonArray](0)
5656

57-
assertEquals(1, reply.getNumber("rows"))
57+
assertEquals(1, reply.getInteger("rows"))
5858

5959
val columnNamesList = receivedFields.asScala.toList
6060

6161
assertEquals("Mr. Test", results.get(columnNamesList.indexOf("name")))
6262
assertEquals("test@example.com", results.get(columnNamesList.indexOf("email")))
63-
assertEquals(15, results.get(columnNamesList.indexOf("age")))
64-
assertEquals(true, results.get(columnNamesList.indexOf("is_male")))
65-
assertEquals(167.31, results.get(columnNamesList.indexOf("money")), 0.1)
63+
assertEquals(15, results.get[Int](columnNamesList.indexOf("age")))
64+
assertTrue(results.get[Any](columnNamesList.indexOf("is_male")) match {
65+
case b: Boolean => b
66+
case i: Number => i.intValue() == 1
67+
case x => false
68+
})
69+
assertEquals(167.31, results.get[Number](columnNamesList.indexOf("money")).doubleValue(), 0.01)
6670
}
6771
}
6872

6973
def createAndDropTable(): Unit = asyncTest {
7074
createTable("some_test") flatMap (_ => dropTable("some_test")) map { reply =>
71-
assertEquals(0, reply.getNumber("rows"))
75+
assertEquals(0, reply.getInteger("rows"))
7276
}
7377
}
7478

@@ -109,22 +113,21 @@ trait BaseSqlTests { this: SqlTestVerticle =>
109113
assertFieldName("age")
110114
assertFieldName("money")
111115
assertFieldName("wedding_date")
116+
val moneyField = receivedFields.toArray().indexOf("money")
112117

113118
val mrTest = reply.getArray("results").get[JsonArray](0)
114119
assertTrue(mrTest.contains("Mr. Test"))
115120
assertTrue(mrTest.contains("test@example.com"))
116121
assertTrue(mrTest.contains(true) || mrTest.contains(1))
117122
assertTrue(mrTest.contains(15))
118-
assertTrue(mrTest.contains(167.31))
123+
assertEquals(167.31, mrTest.get[Number](moneyField).doubleValue(), 0.0001)
119124
}
120125
}
121126

122127
def selectEverything(): Unit = typeTestInsert {
123128
val fieldsArray = Json.arr("name", "email", "is_male", "age", "money", "wedding_date")
124129
expectOk(select("some_test", fieldsArray)) map { reply =>
125130
val receivedFields = reply.getArray("fields")
126-
logger.info("received: " + receivedFields.encode())
127-
logger.info("fieldsAr: " + fieldsArray.encode())
128131
checkSameFields(fieldsArray, receivedFields)
129132
val results = reply.getArray("results")
130133
val mrTest = results.get[JsonArray](0)
@@ -147,9 +150,13 @@ trait BaseSqlTests { this: SqlTestVerticle =>
147150
private def checkMrTest(mrTest: JsonArray) = {
148151
assertEquals("Mr. Test", mrTest.get[String](0))
149152
assertEquals("test@example.com", mrTest.get[String](1))
150-
assertTrue(mrTest.get[Boolean](2) == true || mrTest.get[Integer](2) == 1)
151-
assertEquals(15, mrTest.get[Integer](3))
152-
assertEquals(167.31, mrTest.get[Integer](4))
153+
assertTrue(mrTest.get[Any](2) match {
154+
case b: Boolean => b
155+
case i: Number => i.intValue() == 1
156+
case x => false
157+
})
158+
assertEquals(15, mrTest.get[Number](3).intValue())
159+
assertEquals(167.31, mrTest.get[Number](4).doubleValue(), 0.0001)
153160
// FIXME check date conversion
154161
// assertEquals("2024-04-01", mrTest.get[JsonObject](5))
155162
}
@@ -158,8 +165,8 @@ trait BaseSqlTests { this: SqlTestVerticle =>
158165
assertEquals("Mrs. Test", mrsTest.get[String](0))
159166
assertEquals("test2@example.com", mrsTest.get[String](1))
160167
assertEquals(false, mrsTest.get[Boolean](2))
161-
assertEquals(43, mrsTest.get[Integer](3))
162-
assertEquals(167.31, mrsTest.get[Integer](4))
168+
assertEquals(43L, mrsTest.get[Long](3))
169+
assertEquals(167.31, mrsTest.get[Number](4).doubleValue(), 0.0001)
163170
// FIXME check date conversion
164171
// assertEquals("1997-12-24", mrsTest.get[JsonObject](5))
165172
}
@@ -194,15 +201,32 @@ trait BaseSqlTests { this: SqlTestVerticle =>
194201
}
195202

196203
def transaction(): Unit = typeTestInsert {
197-
expectOk(
198-
transaction(
199-
insert("some_test", Json.arr("name", "email", "is_male", "age", "money"),
200-
Json.arr(Json.arr("Mr. Test jr.", "test3@example.com", true, 5, 2))),
201-
raw("SELECT SUM(age) FROM some_test WHERE is_male = true")))
202-
.map { reply =>
203-
val results = reply.getArray("results")
204-
assertEquals(1, results.size())
205-
assertEquals(20, results.get[JsonArray](0).get[Int](0))
206-
}
204+
(for {
205+
a <- expectOk(
206+
transaction(
207+
insert("some_test", Json.arr("name", "email", "is_male", "age", "money"),
208+
Json.arr(Json.arr("Mr. Test jr.", "test3@example.com", true, 5, 2))),
209+
raw("UPDATE some_test SET age=6 WHERE name = 'Mr. Test jr.'")))
210+
b <- expectOk(raw("SELECT SUM(age) FROM some_test WHERE is_male = true"))
211+
} yield b) map { reply =>
212+
val results = reply.getArray("results")
213+
assertEquals(1, results.size())
214+
assertEquals(21, results.get[JsonArray](0).get[Number](0).intValue())
215+
}
216+
}
217+
218+
def transactionWithPreparedStatement(): Unit = typeTestInsert {
219+
(for {
220+
a <- expectOk(
221+
transaction(
222+
insert("some_test", Json.arr("name", "email", "is_male", "age", "money"),
223+
Json.arr(Json.arr("Mr. Test jr.", "test3@example.com", true, 5, 2))),
224+
prepared("UPDATE some_test SET age=? WHERE name=?", Json.arr(6, "Mr. Test jr."))))
225+
b <- expectOk(raw("SELECT SUM(age) FROM some_test WHERE is_male = true"))
226+
} yield b) map { reply =>
227+
val results = reply.getArray("results")
228+
assertEquals(1, results.size())
229+
assertEquals(21, results.get[JsonArray](0).get[Number](0).intValue())
230+
}
207231
}
208232
}

src/test/scala/io/vertx/asyncsql/test/SqlTestVerticle.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ abstract class SqlTestVerticle extends TestVerticle with BaseVertxIntegrationTes
6565
protected def transaction(statements: JsonObject*) = Json.obj("action" -> "transaction", "statements" -> Json.arr(statements: _*))
6666

6767
protected def createTable(tableName: String) = expectOk(raw(createTableStatement(tableName))) map { reply =>
68-
assertEquals(0, reply.getNumber("rows"))
68+
assertEquals(0, reply.getInteger("rows"))
6969
reply
7070
}
7171

@@ -81,7 +81,7 @@ CREATE TABLE """ + tableName + """ (
8181
email VARCHAR(255) UNIQUE,
8282
is_male BOOLEAN,
8383
age INT,
84-
money FLOAT,
84+
money DOUBLE PRECISION,
8585
wedding_date DATE
8686
);
8787
"""

src/test/scala/io/vertx/asyncsql/test/mysql/MySqlTest.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ CREATE TABLE """ + tableName + """ (
5050
override def selectFiltered(): Unit = super.selectFiltered()
5151
@Test
5252
override def preparedSelect(): Unit = super.preparedSelect()
53-
54-
// @Ignore("not working currently")
55-
// @Test
56-
// override def transaction(): Unit = super.transaction()
53+
@Test
54+
override def transaction(): Unit = super.transaction()
55+
@Test
56+
override def transactionWithPreparedStatement(): Unit = super.transactionWithPreparedStatement()
5757

5858
}

src/test/scala/io/vertx/asyncsql/test/postgresql/PostgreSqlTest.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,5 +38,7 @@ class PostgreSqlTest extends SqlTestVerticle with BaseSqlTests {
3838
override def preparedSelect(): Unit = super.preparedSelect()
3939
@Test
4040
override def transaction(): Unit = super.transaction()
41+
@Test
42+
override def transactionWithPreparedStatement(): Unit = super.transactionWithPreparedStatement()
4143

4244
}

0 commit comments

Comments
 (0)