|
1 | 1 | package io.vertx.asyncsql.database
|
2 | 2 |
|
3 | 3 | import scala.collection.JavaConverters.iterableAsScalaIterableConverter
|
4 |
| -import scala.concurrent.Future |
| 4 | +import scala.concurrent.{Promise, Future} |
5 | 5 | import org.vertx.scala.core.json.{JsonElement, JsonArray, JsonObject, Json}
|
6 | 6 | import org.vertx.scala.core.logging.Logger
|
7 | 7 | import com.github.mauricio.async.db.{Configuration, Connection, QueryResult, RowData}
|
@@ -31,71 +31,81 @@ trait ConnectionHandler extends ScalaBusMod {
|
31 | 31 |
|
32 | 32 | def transactionEnd: String = "COMMIT;"
|
33 | 33 |
|
| 34 | + def transactionRollback: String = "ROLLBACK;" |
| 35 | + |
34 | 36 | def statementDelimiter: String = ";"
|
35 | 37 |
|
| 38 | + private def timeout = 500L /* FIXME from config file! */ |
| 39 | + |
36 | 40 | import org.vertx.scala.core.eventbus._
|
37 | 41 |
|
38 | 42 | private def receiver(withConnectionFn: (Connection => Future[SyncReply]) => Future[SyncReply]): Receive = (msg: Message[JsonObject]) => {
|
39 | 43 | def sendAsyncWithPool(fn: Connection => Future[QueryResult]) = AsyncReply(sendWithPool(withConnectionFn)(fn))
|
40 | 44 |
|
41 | 45 | {
|
42 |
| - case "select" => sendAsyncWithPool(rawCommand(selectCommand(msg.body))) |
43 |
| - case "insert" => sendAsyncWithPool(rawCommand(insertCommand(msg.body))) |
44 |
| - case "prepared" => sendAsyncWithPool(prepared(msg.body)) |
45 |
| - case "raw" => sendAsyncWithPool(rawCommand(msg.body.getString("command"))) |
46 |
| - case "transaction" => transaction(withConnectionFn)(msg.body) |
| 46 | + case "select" => sendAsyncWithPool(rawCommand(selectCommand(msg.body()))) |
| 47 | + case "insert" => sendAsyncWithPool(rawCommand(insertCommand(msg.body()))) |
| 48 | + case "prepared" => sendAsyncWithPool(prepared(msg.body())) |
| 49 | + case "raw" => sendAsyncWithPool(rawCommand(msg.body().getString("command"))) |
47 | 50 | }
|
48 | 51 | }
|
49 | 52 |
|
50 |
| - override def receive: Receive = { msg: Message[JsonObject] => |
| 53 | + private def regularReceive: Receive = { msg: Message[JsonObject] => |
51 | 54 | receiver(pool.withConnection)(msg).orElse {
|
52 | 55 | case "start" => startTransaction(msg)
|
| 56 | + case "transaction" => transaction(pool.withConnection)(msg.body()) |
53 | 57 | }
|
54 | 58 | }
|
55 | 59 |
|
| 60 | + override def receive: Receive = regularReceive |
| 61 | + |
56 | 62 |
|
57 | 63 | //------------------
|
58 | 64 | //New transaction stuff
|
59 |
| - //TODO reformat when finished |
| 65 | + private def mapRepliesToTransactionReceive(c: Connection): BusModReply => BusModReply = { |
| 66 | + case AsyncReply(receiveEndFuture) => AsyncReply(receiveEndFuture.map(mapRepliesToTransactionReceive(c))) |
| 67 | + case Ok(v, None) => Ok(v, Some(ReceiverWithTimeout(inTransactionReceive(c), timeout, () => failTransaction(c)))) |
| 68 | + case x => x |
| 69 | + } |
60 | 70 |
|
61 |
| - protected def endTransaction() = { |
62 |
| - /* FIXME */ |
63 |
| - logger.info("ending transaction!") |
| 71 | + private def inTransactionReceive(c: Connection): Receive = { msg: Message[JsonObject] => |
| 72 | + def withConnection[T](fn: Connection => Future[T]): Future[T] = fn(c) |
| 73 | + |
| 74 | + receiver(withConnection)(msg).andThen({ |
| 75 | + case x: BusModReply => mapRepliesToTransactionReceive(c)(x) |
| 76 | + case x => x |
| 77 | + }).orElse { |
| 78 | + case "end" => endTransaction(c) |
| 79 | + } |
64 | 80 | }
|
65 | 81 |
|
66 | 82 | protected def startTransaction(msg: Message[JsonObject]) = AsyncReply {
|
67 |
| - pool.withConnection({ c => |
68 |
| - def withConnection[T](fn: Connection => Future[T]): Future[T] = fn(c) |
69 |
| - |
| 83 | + pool.take().flatMap { c => |
70 | 84 | c.sendQuery(transactionStart) map { _ =>
|
71 |
| - Ok(Json.obj(), Some(ReceiverWithTimeout(transactionQueryReply(withConnection), 500L /* FIXME from config file! */ , endTransaction))) |
| 85 | + Ok(Json.obj(), Some(ReceiverWithTimeout(inTransactionReceive(c), timeout, () => failTransaction(c)))) |
72 | 86 | }
|
73 |
| - }) |
| 87 | + } |
74 | 88 | }
|
75 | 89 |
|
76 |
| - private def transactionQueryReply(withConnection: (Connection => Future[SyncReply]) => Future[SyncReply]): Receive = { msg => |
77 |
| - val action = msg.body.getString("action") |
| 90 | + protected def failTransaction(c: Connection) = { |
| 91 | + logger.info("NO REPLY BACK -> FAIL TRANSACTION!") |
| 92 | + c.sendQuery(transactionRollback).andThen({ |
| 93 | + case _ => pool.giveBack(c) |
| 94 | + }) |
| 95 | + } |
78 | 96 |
|
79 |
| - receiver(withConnection)(msg).orElse{ |
80 |
| - case "start" => Error("cannot send 'start' action when inside of transaction!") |
81 |
| - case "end" => |
82 |
| - logger.info("got action end!") |
83 |
| - // AsyncReply{withConnection} |
| 97 | + protected def endTransaction(c: Connection) = { |
| 98 | + logger.info("ending transaction!") |
| 99 | + AsyncReply { |
| 100 | + (for { |
| 101 | + qr <- c.sendQuery(transactionEnd) |
| 102 | + _ <- pool.giveBack(c) |
| 103 | + } yield { |
84 | 104 | Ok()
|
| 105 | + }) recover { |
| 106 | + case ex => Error("Could not give back connection to pool", "CONNECTION_POOL_EXCEPTION", Json.obj("exception" -> ex)) |
| 107 | + } |
85 | 108 | }
|
86 |
| - |
87 |
| - val opt = pf.lift(action).map({ |
88 |
| - case Ok(v, None) => Ok(v, Some(ReceiverWithTimeout(transactionQueryReply(withConnection), 500L /* FIXME from config file! */ , endTransaction))) |
89 |
| - case x => x |
90 |
| - }: Function[BusModReceiveEnd, BusModReceiveEnd]) |
91 |
| - |
92 |
| - opt.getOrElse { |
93 |
| - case "start" => Error("cannot send 'start' action when inside of transaction!") |
94 |
| - case "end" => |
95 |
| - logger.info("got action end!") |
96 |
| - // AsyncReply{withConnection} |
97 |
| - Ok() |
98 |
| - }: PartialFunction[String, BusModReceiveEnd] |
99 | 109 | }
|
100 | 110 |
|
101 | 111 | //------------------
|
|
0 commit comments