Skip to content

Commit ac8382d

Browse files
committed
Merge pull request mauricio#63 from dylex/mutex
Connection mutex improvements for issue mauricio#59
2 parents 23f407c + f754c1b commit ac8382d

File tree

3 files changed

+47
-50
lines changed

3 files changed

+47
-50
lines changed

db-async-common/src/main/scala/com/github/mauricio/async/db/exceptions/ConnectionStillRunningQueryException.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616

1717
package com.github.mauricio.async.db.exceptions
1818

19-
class ConnectionStillRunningQueryException( connectionCount : Long, readyForQuery : Boolean )
20-
extends DatabaseException ( "[%s] - There is a query still being run here - readyForQuery -> %s".format(
19+
class ConnectionStillRunningQueryException( connectionCount : Long, caughtRace : Boolean)
20+
extends DatabaseException ( "[%s] - There is a query still being run here - race -> %s".format(
2121
connectionCount,
22-
readyForQuery
22+
caughtRace
2323
))

mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/MySQLConnection.scala

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import com.github.mauricio.async.db.mysql.message.server._
2525
import com.github.mauricio.async.db.mysql.util.CharsetMapper
2626
import com.github.mauricio.async.db.util.ChannelFutureTransformer.toFuture
2727
import com.github.mauricio.async.db.util._
28-
import java.util.concurrent.atomic.AtomicLong
28+
import java.util.concurrent.atomic.{AtomicLong,AtomicReference}
2929
import scala.Some
3030
import scala.concurrent.{ExecutionContext, Promise, Future}
3131
import scala.util.Failure
@@ -78,7 +78,7 @@ class MySQLConnection(
7878
private final val connectionPromise = Promise[Connection]()
7979
private final val disconnectionPromise = Promise[Connection]()
8080

81-
private var queryPromise: Promise[QueryResult] = null
81+
private val queryPromiseReference = new AtomicReference[Option[Promise[QueryResult]]](None)
8282
private var connected = false
8383
private var _lastException : Throwable = null
8484
private var serverVersion : Version = null
@@ -185,33 +185,28 @@ class MySQLConnection(
185185
def sendQuery(query: String): Future[QueryResult] = {
186186
this.validateIsReadyForQuery()
187187
val promise = Promise[QueryResult]
188-
this.queryPromise = promise
188+
this.setQueryPromise(promise)
189189
this.connectionHandler.write(new QueryMessage(query))
190190
promise.future
191191
}
192192

193193
private def failQueryPromise(t: Throwable) {
194194

195-
if (this.isQuerying) {
196-
val promise = this.queryPromise
197-
this.queryPromise = null
198-
199-
promise.tryFailure(t)
195+
this.clearQueryPromise.foreach {
196+
_.tryFailure(t)
200197
}
201198

202199
}
203200

204201
private def succeedQueryPromise(queryResult: QueryResult) {
205202

206-
if (this.isQuerying) {
207-
val promise = this.queryPromise
208-
this.queryPromise = null
209-
promise.success(queryResult)
203+
this.clearQueryPromise.foreach {
204+
_.success(queryResult)
210205
}
211206

212207
}
213208

214-
def isQuerying: Boolean = this.queryPromise != null && !this.queryPromise.isCompleted
209+
def isQuerying: Boolean = this.queryPromise.isDefined
215210

216211
def onResultSet(resultSet: ResultSet, message: EOFMessage) {
217212
if (this.isQuerying) {
@@ -239,15 +234,25 @@ class MySQLConnection(
239234
throw new InsufficientParametersException(totalParameters, values)
240235
}
241236
val promise = Promise[QueryResult]
242-
this.queryPromise = promise
237+
this.setQueryPromise(promise)
243238
this.connectionHandler.write(new PreparedStatementMessage(query, values))
244239
promise.future
245240
}
246241

247242
private def validateIsReadyForQuery() {
248-
if ( this.queryPromise != null && !this.queryPromise.isCompleted ) {
249-
throw new ConnectionStillRunningQueryException(this.connectionCount, false )
243+
if ( isQuerying ) {
244+
throw new ConnectionStillRunningQueryException(this.connectionCount, false)
250245
}
251246
}
252247

248+
private def queryPromise: Option[Promise[QueryResult]] = queryPromiseReference.get()
249+
250+
private def setQueryPromise(promise: Promise[QueryResult]) {
251+
if (!this.queryPromiseReference.compareAndSet(None, Some(promise)))
252+
throw new ConnectionStillRunningQueryException(this.connectionCount, true)
253+
}
254+
255+
private def clearQueryPromise : Option[Promise[QueryResult]] = {
256+
this.queryPromiseReference.getAndSet(None)
257+
}
253258
}

postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/PostgreSQLConnection.scala

Lines changed: 23 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import com.github.mauricio.async.db.postgresql.column.{PostgreSQLColumnDecoderRe
2525
import com.github.mauricio.async.db.postgresql.exceptions._
2626
import com.github.mauricio.async.db.util._
2727
import com.github.mauricio.async.db.{Configuration, Connection}
28-
import java.util.concurrent.atomic._
28+
import java.util.concurrent.atomic.{AtomicLong,AtomicInteger,AtomicReference}
2929
import messages.backend._
3030
import messages.frontend._
3131
import scala.Some
@@ -65,7 +65,6 @@ class PostgreSQLConnection
6565
private final val preparedStatementsCounter = new AtomicInteger()
6666
private final implicit val internalExecutionContext = executionContext
6767

68-
private var readyForQuery = false
6968
private val parameterStatus = new scala.collection.mutable.HashMap[String, String]()
7069
private val parsedStatements = new scala.collection.mutable.HashMap[String, PreparedStatementHolder]()
7170
private var authenticated = false
@@ -80,7 +79,7 @@ class PostgreSQLConnection
8079

8180
private var queryResult: Option[QueryResult] = None
8281

83-
def isReadyForQuery: Boolean = this.readyForQuery
82+
def isReadyForQuery: Boolean = this.queryPromise.isEmpty
8483

8584
def connect: Future[Connection] = {
8685
this.connectionHandler.connect.onFailure {
@@ -98,7 +97,6 @@ class PostgreSQLConnection
9897

9998
override def sendQuery(query: String): Future[QueryResult] = {
10099
validateQuery(query)
101-
this.readyForQuery = false
102100

103101
val promise = Promise[QueryResult]()
104102
this.setQueryPromise(promise)
@@ -132,7 +130,6 @@ class PostgreSQLConnection
132130
throw new InsufficientParametersException(paramsCount, values)
133131
}
134132

135-
this.readyForQuery = false
136133
val promise = Promise[QueryResult]()
137134
this.setQueryPromise(promise)
138135
this.currentPreparedStatement = Some(query)
@@ -172,19 +169,15 @@ class PostgreSQLConnection
172169
this.disconnect
173170
}
174171

175-
this.failQueryPromise(e)
176-
177172
this.currentPreparedStatement = None
173+
this.failQueryPromise(e)
178174
}
179175

180176
override def onReadyForQuery() {
181177
this.connectionFuture.trySuccess(this)
182178

183-
queryResult.map(this.succeedQueryPromise)
184-
185-
this.queryResult = None
186179
this.recentError = false
187-
this.readyForQuery = true
180+
queryResult.foreach(this.succeedQueryPromise)
188181
}
189182

190183
override def onError(m: ErrorMessage) {
@@ -269,15 +262,18 @@ class PostgreSQLConnection
269262
authenticationMessage.challengeType)
270263
}
271264
}
265+
266+
private[this] def notReadyForQueryError(errorMessage : String, race : Boolean) = {
267+
log.error(errorMessage)
268+
throw new ConnectionStillRunningQueryException(
269+
this.currentCount,
270+
race
271+
)
272+
}
272273

273274
def validateIfItIsReadyForQuery(errorMessage: String) =
274-
if (this.queryPromise.isDefined) {
275-
log.error(errorMessage)
276-
throw new ConnectionStillRunningQueryException(
277-
this.currentCount,
278-
this.readyForQuery
279-
)
280-
}
275+
if (this.queryPromise.isDefined)
276+
notReadyForQueryError(errorMessage, false)
281277

282278
private def validateQuery(query: String) {
283279
this.validateIfItIsReadyForQuery("Can't run query because there is one query pending already")
@@ -290,29 +286,25 @@ class PostgreSQLConnection
290286
private def queryPromise: Option[Promise[QueryResult]] = queryPromiseReference.get()
291287

292288
private def setQueryPromise(promise: Promise[QueryResult]) {
293-
this.queryPromiseReference.set(Some(promise))
289+
if (!this.queryPromiseReference.compareAndSet(None, Some(promise)))
290+
notReadyForQueryError("Can't run query due to a race with another started query", true)
294291
}
295292

296-
private def clearQueryPromise {
297-
this.queryPromiseReference.set(None)
293+
private def clearQueryPromise : Option[Promise[QueryResult]] = {
294+
this.queryPromiseReference.getAndSet(None)
298295
}
299296

300297
private def failQueryPromise(t: Throwable) {
301-
val promise = this.queryPromise
302-
303-
if (promise.isDefined) {
304-
this.clearQueryPromise
298+
this.clearQueryPromise.foreach { promise =>
305299
log.error("Setting error on future {}", promise)
306-
promise.get.failure(t)
300+
promise.failure(t)
307301
}
308302
}
309303

310304
private def succeedQueryPromise(result: QueryResult) {
311-
val promise = this.queryPromise
312-
313-
if (promise.isDefined) {
314-
this.clearQueryPromise
315-
promise.get.success(result)
305+
this.queryResult = None
306+
this.clearQueryPromise.foreach {
307+
_.success(result)
316308
}
317309
}
318310

0 commit comments

Comments
 (0)