@@ -28,9 +28,9 @@ trait ConnectionHandler extends ScalaBusMod with VertxScalaHelpers {
28
28
override def asyncReceive (msg : Message [JsonObject ]) = {
29
29
case " select" => select(msg.body)
30
30
case " insert" => insert(msg.body)
31
- case " prepared" => prepared(msg.body)
31
+ case " prepared" => sendWithPool( prepared(msg.body) )
32
32
case " transaction" => transaction(msg.body)
33
- case " raw" => rawCommand(msg.body.getString(" command" ))
33
+ case " raw" => sendWithPool( rawCommand(msg.body.getString(" command" ) ))
34
34
}
35
35
36
36
def close () = pool.close
@@ -53,7 +53,7 @@ trait ConnectionHandler extends ScalaBusMod with VertxScalaHelpers {
53
53
}
54
54
55
55
protected def select (json : JsonObject ): Future [Reply ] = pool.withConnection({ c : Connection =>
56
- rawCommand(selectCommand(json))
56
+ sendWithPool( rawCommand(selectCommand(json) ))
57
57
})
58
58
59
59
protected def insertCommand (json : JsonObject ): String = {
@@ -74,43 +74,50 @@ trait ConnectionHandler extends ScalaBusMod with VertxScalaHelpers {
74
74
}
75
75
76
76
protected def insert (json : JsonObject ): Future [Reply ] = {
77
- rawCommand(insertCommand(json))
77
+ sendWithPool( rawCommand(insertCommand(json) ))
78
78
}
79
79
80
+ sealed trait CommandType { val query : Connection => Future [QueryResult ] }
81
+ case class Raw (stmt : String ) extends CommandType { val query = rawCommand(stmt) }
82
+ case class Prepared (json : JsonObject ) extends CommandType { val query = prepared(json) }
83
+
80
84
protected def transaction (json : JsonObject ): Future [Reply ] = pool.withConnection({ c : Connection =>
81
85
logger.info(" TRANSACTION-JSON: " + json.encodePrettily())
86
+
82
87
Option (json.getArray(" statements" )) match {
83
- case Some (statements) => rawCommand((statements.asScala.map {
84
- case js : JsonObject =>
85
- js.getString(" action" ) match {
86
- case " select" => selectCommand(js)
87
- case " insert" => insertCommand(js)
88
- case " raw" => js.getString(" command" )
89
- }
90
- case _ => throw new IllegalArgumentException (" 'statements' needs JsonObjects!" )
91
- }).mkString(transactionStart, statementDelimiter, statementDelimiter + transactionEnd))
88
+ case Some (statements) => c.inTransaction { conn : Connection =>
89
+ val futures = (statements.asScala.map {
90
+ case js : JsonObject =>
91
+ js.getString(" action" ) match {
92
+ case " select" => Raw (selectCommand(js))
93
+ case " insert" => Raw (insertCommand(js))
94
+ case " prepared" => Prepared (js)
95
+ case " raw" => Raw (js.getString(" command" ))
96
+ }
97
+ case _ => throw new IllegalArgumentException (" 'statements' needs JsonObjects!" )
98
+ })
99
+ val f = (futures.foldLeft(Future [Any ]()) { case (f, cmd) => f flatMap (_ => cmd.query(conn)) })
100
+ f map (_ => Ok (Json .obj()))
101
+ }
102
+ // .mkString(transactionStart, statementDelimiter, statementDelimiter + transactionEnd))
92
103
case None => throw new IllegalArgumentException (" No 'statements' field in request!" )
93
104
}
94
105
})
95
106
96
- protected def prepared ( json : JsonObject ): Future [Reply ] = pool.withConnection({ c : Connection =>
97
- c.sendPreparedStatement(json.getString( " statement " ), json.getArray( " values " ).toArray() ) map buildResults recover {
107
+ protected def sendWithPool ( fn : Connection => Future [ QueryResult ] ): Future [Reply ] = pool.withConnection({ c : Connection =>
108
+ fn(c ) map buildResults recover {
98
109
case x : GenericDatabaseException =>
99
110
Error (x.errorMessage.message)
100
111
case x =>
101
112
Error (x.getMessage())
102
113
}
103
114
})
104
115
105
- protected def rawCommand (command : String ): Future [Reply ] = pool.withConnection({ c : Connection =>
106
- logger.info(" sending command: " + command)
107
- c.sendQuery(command) map buildResults recover {
108
- case x : GenericDatabaseException =>
109
- Error (x.errorMessage.message)
110
- case x =>
111
- Error (x.getMessage())
112
- }
113
- })
116
+ protected def prepared (json : JsonObject ): Connection => Future [QueryResult ] = { c : Connection =>
117
+ c.sendPreparedStatement(json.getString(" statement" ), json.getArray(" values" ).toArray())
118
+ }
119
+
120
+ protected def rawCommand (command : String ): Connection => Future [QueryResult ] = { c : Connection => c.sendQuery(command) }
114
121
115
122
private def buildResults (qr : QueryResult ): Reply = {
116
123
val result = new JsonObject ()
@@ -123,13 +130,9 @@ trait ConnectionHandler extends ScalaBusMod with VertxScalaHelpers {
123
130
arr.addString(name)
124
131
}
125
132
126
- val rows = Json .arr((for {
127
- rowData <- resultSet
128
- } yield Json .arr((for {
129
- columnName <- resultSet.columnNames
130
- } yield {
131
- rowData(columnName)
132
- }): _* )): _* )
133
+ val rows = (new JsonArray () /: resultSet) { (arr, rowData) =>
134
+ arr.add(rowDataToJsonArray(rowData))
135
+ }
133
136
134
137
result.putArray(" fields" , fields)
135
138
result.putArray(" results" , rows)
@@ -138,4 +141,6 @@ trait ConnectionHandler extends ScalaBusMod with VertxScalaHelpers {
138
141
139
142
Ok (result)
140
143
}
144
+
145
+ private def rowDataToJsonArray (rowData : RowData ): JsonArray = Json .arr(rowData.toList: _* )
141
146
}
0 commit comments